summaryrefslogtreecommitdiff
path: root/src/stream_ext/any.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream_ext/any.rs')
-rw-r--r--src/stream_ext/any.rs21
1 files changed, 12 insertions, 9 deletions
diff --git a/src/stream_ext/any.rs b/src/stream_ext/any.rs
index 4c4c593..31394f2 100644
--- a/src/stream_ext/any.rs
+++ b/src/stream_ext/any.rs
@@ -38,18 +38,21 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
- let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx));
+ let mut stream = Pin::new(me.stream);
- match next {
- Some(v) => {
- if (me.f)(v) {
- Poll::Ready(true)
- } else {
- cx.waker().wake_by_ref();
- Poll::Pending
+ // Take a maximum of 32 items from the stream before yielding.
+ for _ in 0..32 {
+ match futures_core::ready!(stream.as_mut().poll_next(cx)) {
+ Some(v) => {
+ if (me.f)(v) {
+ return Poll::Ready(true);
+ }
}
+ None => return Poll::Ready(false),
}
- None => Poll::Ready(false),
}
+
+ cx.waker().wake_by_ref();
+ Poll::Pending
}
}