diff options
Diffstat (limited to 'src/stream/try_stream/try_for_each_concurrent.rs')
-rw-r--r-- | src/stream/try_stream/try_for_each_concurrent.rs | 42 |
1 files changed, 17 insertions, 25 deletions
diff --git a/src/stream/try_stream/try_for_each_concurrent.rs b/src/stream/try_stream/try_for_each_concurrent.rs index 19c3e5b..87fd465 100644 --- a/src/stream/try_stream/try_for_each_concurrent.rs +++ b/src/stream/try_stream/try_for_each_concurrent.rs @@ -6,24 +6,21 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use pin_project::{pin_project, project}; /// Future for the /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent) /// method. +#[pin_project] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TryForEachConcurrent<St, Fut, F> { + #[pin] stream: Option<St>, f: F, futures: FuturesUnordered<Fut>, limit: Option<NonZeroUsize>, } -impl<St, Fut, F> Unpin for TryForEachConcurrent<St, Fut, F> -where St: Unpin, - Fut: Unpin, -{} - impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F> where St: fmt::Debug, @@ -53,11 +50,6 @@ where St: TryStream, F: FnMut(St::Ok) -> Fut, Fut: Future<Output = Result<(), St::Error>>, { - unsafe_pinned!(stream: Option<St>); - unsafe_unpinned!(f: F); - unsafe_unpinned!(futures: FuturesUnordered<Fut>); - unsafe_unpinned!(limit: Option<NonZeroUsize>); - pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> TryForEachConcurrent<St, Fut, F> { TryForEachConcurrent { stream: Some(stream), @@ -76,15 +68,16 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> { type Output = Result<(), St::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + #[project] + let TryForEachConcurrent { mut stream, f, futures, limit } = self.project(); loop { let mut made_progress_this_iter = false; - // Try and pull an item from the stream - let current_len = self.futures.len(); // Check if we've already created a number of futures greater than `limit` - if self.limit.map(|limit| limit.get() > current_len).unwrap_or(true) { - let poll_res = match self.as_mut().stream().as_pin_mut() { + if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) { + let poll_res = match stream.as_mut().as_pin_mut() { Some(stream) => stream.try_poll_next(cx), None => Poll::Ready(None), }; @@ -95,29 +88,28 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> Some(elem) }, Poll::Ready(None) => { - self.as_mut().stream().set(None); + stream.set(None); None } Poll::Pending => None, Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know // the future has completed. - self.as_mut().stream().set(None); - drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new())); + stream.set(None); + drop(mem::replace(futures, FuturesUnordered::new())); return Poll::Ready(Err(e)); } }; if let Some(elem) = elem { - let next_future = (self.as_mut().f())(elem); - self.as_mut().futures().push(next_future); + futures.push(f(elem)); } } - match self.as_mut().futures().poll_next_unpin(cx) { + match futures.poll_next_unpin(cx) { Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true, Poll::Ready(None) => { - if self.stream.is_none() { + if stream.is_none() { return Poll::Ready(Ok(())) } }, @@ -125,8 +117,8 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know // the future has completed. - self.as_mut().stream().set(None); - drop(mem::replace(self.as_mut().futures(), FuturesUnordered::new())); + stream.set(None); + drop(mem::replace(futures, FuturesUnordered::new())); return Poll::Ready(Err(e)); } } |