diff options
Diffstat (limited to 'src/stream/stream/for_each_concurrent.rs')
-rw-r--r-- | src/stream/stream/for_each_concurrent.rs | 27 |
1 files changed, 14 insertions, 13 deletions
diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs index 843ddaa..cee0ba1 100644 --- a/src/stream/stream/for_each_concurrent.rs +++ b/src/stream/stream/for_each_concurrent.rs @@ -5,18 +5,19 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent) -/// method. -#[pin_project] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ForEachConcurrent<St, Fut, F> { - #[pin] - stream: Option<St>, - f: F, - futures: FuturesUnordered<Fut>, - limit: Option<NonZeroUsize>, +pin_project! { + /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent) + /// method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct ForEachConcurrent<St, Fut, F> { + #[pin] + stream: Option<St>, + f: F, + futures: FuturesUnordered<Fut>, + limit: Option<NonZeroUsize>, + } } impl<St, Fut, F> fmt::Debug for ForEachConcurrent<St, Fut, F> @@ -38,8 +39,8 @@ where St: Stream, F: FnMut(St::Item) -> Fut, Fut: Future<Output = ()>, { - pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> ForEachConcurrent<St, Fut, F> { - ForEachConcurrent { + pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self { + Self { stream: Some(stream), // Note: `limit` = 0 gets ignored. limit: limit.and_then(NonZeroUsize::new), |