diff options
Diffstat (limited to 'src/stream_ext/any.rs')
-rw-r--r-- | src/stream_ext/any.rs | 21 |
1 files changed, 12 insertions, 9 deletions
diff --git a/src/stream_ext/any.rs b/src/stream_ext/any.rs index 4c4c593..31394f2 100644 --- a/src/stream_ext/any.rs +++ b/src/stream_ext/any.rs @@ -38,18 +38,21 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let me = self.project(); - let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); + let mut stream = Pin::new(me.stream); - match next { - Some(v) => { - if (me.f)(v) { - Poll::Ready(true) - } else { - cx.waker().wake_by_ref(); - Poll::Pending + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if (me.f)(v) { + return Poll::Ready(true); + } } + None => return Poll::Ready(false), } - None => Poll::Ready(false), } + + cx.waker().wake_by_ref(); + Poll::Pending } } |