aboutsummaryrefslogtreecommitdiff
path: root/src/stream/stream/for_each_concurrent.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/stream/for_each_concurrent.rs')
-rw-r--r--src/stream/stream/for_each_concurrent.rs27
1 files changed, 14 insertions, 13 deletions
diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs
index 843ddaa..cee0ba1 100644
--- a/src/stream/stream/for_each_concurrent.rs
+++ b/src/stream/stream/for_each_concurrent.rs
@@ -5,18 +5,19 @@ use core::num::NonZeroUsize;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent)
-/// method.
-#[pin_project]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct ForEachConcurrent<St, Fut, F> {
- #[pin]
- stream: Option<St>,
- f: F,
- futures: FuturesUnordered<Fut>,
- limit: Option<NonZeroUsize>,
+pin_project! {
+ /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent)
+ /// method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct ForEachConcurrent<St, Fut, F> {
+ #[pin]
+ stream: Option<St>,
+ f: F,
+ futures: FuturesUnordered<Fut>,
+ limit: Option<NonZeroUsize>,
+ }
}
impl<St, Fut, F> fmt::Debug for ForEachConcurrent<St, Fut, F>
@@ -38,8 +39,8 @@ where St: Stream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = ()>,
{
- pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> ForEachConcurrent<St, Fut, F> {
- ForEachConcurrent {
+ pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self {
+ Self {
stream: Some(stream),
// Note: `limit` = 0 gets ignored.
limit: limit.and_then(NonZeroUsize::new),