diff options
Diffstat (limited to 'src/stream/stream/for_each_concurrent.rs')
-rw-r--r-- | src/stream/stream/for_each_concurrent.rs | 29 |
1 files changed, 16 insertions, 13 deletions
diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs index cee0ba1..6c18753 100644 --- a/src/stream/stream/for_each_concurrent.rs +++ b/src/stream/stream/for_each_concurrent.rs @@ -1,7 +1,7 @@ use crate::stream::{FuturesUnordered, StreamExt}; use core::fmt; -use core::pin::Pin; use core::num::NonZeroUsize; +use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; @@ -35,9 +35,10 @@ where } impl<St, Fut, F> ForEachConcurrent<St, Fut, F> -where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future<Output = ()>, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = ()>, { pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self { Self { @@ -51,9 +52,10 @@ where St: Stream, } impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F> - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future<Output = ()>, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = ()>, { fn is_terminated(&self) -> bool { self.stream.is_none() && self.futures.is_empty() @@ -61,9 +63,10 @@ impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F> } impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future<Output = ()>, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = ()>, { type Output = (); @@ -80,7 +83,7 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> Poll::Ready(Some(elem)) => { made_progress_this_iter = true; Some(elem) - }, + } Poll::Ready(None) => { stream_completed = true; None @@ -102,9 +105,9 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> Poll::Ready(Some(())) => made_progress_this_iter = true, Poll::Ready(None) => { if this.stream.is_none() { - return Poll::Ready(()) + return Poll::Ready(()); } - }, + } Poll::Pending => {} } |