aboutsummaryrefslogtreecommitdiff
path: root/src/stream/stream/for_each_concurrent.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/stream/for_each_concurrent.rs')
-rw-r--r--src/stream/stream/for_each_concurrent.rs29
1 files changed, 16 insertions, 13 deletions
diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs
index cee0ba1..6c18753 100644
--- a/src/stream/stream/for_each_concurrent.rs
+++ b/src/stream/stream/for_each_concurrent.rs
@@ -1,7 +1,7 @@
use crate::stream::{FuturesUnordered, StreamExt};
use core::fmt;
-use core::pin::Pin;
use core::num::NonZeroUsize;
+use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
@@ -35,9 +35,10 @@ where
}
impl<St, Fut, F> ForEachConcurrent<St, Fut, F>
-where St: Stream,
- F: FnMut(St::Item) -> Fut,
- Fut: Future<Output = ()>,
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Fut,
+ Fut: Future<Output = ()>,
{
pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self {
Self {
@@ -51,9 +52,10 @@ where St: Stream,
}
impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F>
- where St: Stream,
- F: FnMut(St::Item) -> Fut,
- Fut: Future<Output = ()>,
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Fut,
+ Fut: Future<Output = ()>,
{
fn is_terminated(&self) -> bool {
self.stream.is_none() && self.futures.is_empty()
@@ -61,9 +63,10 @@ impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F>
}
impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
- where St: Stream,
- F: FnMut(St::Item) -> Fut,
- Fut: Future<Output = ()>,
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Fut,
+ Fut: Future<Output = ()>,
{
type Output = ();
@@ -80,7 +83,7 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
Poll::Ready(Some(elem)) => {
made_progress_this_iter = true;
Some(elem)
- },
+ }
Poll::Ready(None) => {
stream_completed = true;
None
@@ -102,9 +105,9 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F>
Poll::Ready(Some(())) => made_progress_this_iter = true,
Poll::Ready(None) => {
if this.stream.is_none() {
- return Poll::Ready(())
+ return Poll::Ready(());
}
- },
+ }
Poll::Pending => {}
}