aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/futures_unordered/mod.rs42
-rw-r--r--src/stream/futures_unordered/ready_to_run_queue.rs2
-rw-r--r--src/stream/futures_unordered/task.rs11
-rw-r--r--src/stream/stream/count.rs53
-rw-r--r--src/stream/stream/flatten_unordered.rs509
-rw-r--r--src/stream/stream/mod.rs162
-rw-r--r--src/stream/stream/scan.rs6
-rw-r--r--src/stream/try_stream/mod.rs7
8 files changed, 758 insertions, 34 deletions
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs
index 4a05d88..fdbd53d 100644
--- a/src/stream/futures_unordered/mod.rs
+++ b/src/stream/futures_unordered/mod.rs
@@ -121,8 +121,9 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Weak::new(),
+ woken: AtomicBool::new(false),
});
- let stub_ptr = &*stub as *const Task<Fut>;
+ let stub_ptr = Arc::as_ptr(&stub);
let ready_to_run_queue = Arc::new(ReadyToRunQueue {
waker: AtomicWaker::new(),
head: AtomicPtr::new(stub_ptr as *mut _),
@@ -167,6 +168,7 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
+ woken: AtomicBool::new(false),
});
// Reset the `is_terminated` flag if we've previously marked ourselves
@@ -375,7 +377,7 @@ impl<Fut> FuturesUnordered<Fut> {
// The `ReadyToRunQueue` stub is never inserted into the `head_all`
// list, and its pointer value will remain valid for the lifetime of
// this `FuturesUnordered`, so we can make use of its value here.
- &*self.ready_to_run_queue.stub as *const _ as *mut _
+ Arc::as_ptr(&self.ready_to_run_queue.stub) as *mut _
}
}
@@ -383,25 +385,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
type Item = Fut::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- // Variable to determine how many times it is allowed to poll underlying
- // futures without yielding.
- //
- // A single call to `poll_next` may potentially do a lot of work before
- // yielding. This happens in particular if the underlying futures are awoken
- // frequently but continue to return `Pending`. This is problematic if other
- // tasks are waiting on the executor, since they do not get to run. This value
- // caps the number of calls to `poll` on underlying futures a single call to
- // `poll_next` is allowed to make.
- //
- // The value is the length of FuturesUnordered. This ensures that each
- // future is polled only once at most per iteration.
- //
- // See also https://github.com/rust-lang/futures-rs/issues/2047.
- let yield_every = self.len();
+ let len = self.len();
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
+ let mut yielded = 0;
// Ensure `parent` is correctly set.
self.ready_to_run_queue.waker.register(cx.waker());
@@ -512,7 +501,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
// the internal allocation, appropriately accessing fields and
// deallocating the task if need be.
let res = {
- let waker = Task::waker_ref(bomb.task.as_ref().unwrap());
+ let task = bomb.task.as_ref().unwrap();
+ // We are only interested in whether the future is awoken before it
+ // finishes polling, so reset the flag here.
+ task.woken.store(false, Relaxed);
+ let waker = Task::waker_ref(task);
let mut cx = Context::from_waker(&waker);
// Safety: We won't move the future ever again
@@ -525,12 +518,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
match res {
Poll::Pending => {
let task = bomb.task.take().unwrap();
+ // If the future was awoken during polling, we assume
+ // the future wanted to explicitly yield.
+ yielded += task.woken.load(Relaxed) as usize;
bomb.queue.link(task);
- if polled == yield_every {
- // We have polled a large number of futures in a row without yielding.
- // To ensure we do not starve other tasks waiting on the executor,
- // we yield here, but immediately wake ourselves up to continue.
+ // If a future yields, we respect it and yield here.
+ // If all futures have been polled, we also yield here to
+ // avoid starving other tasks waiting on the executor.
+ // (polling the same future twice per iteration may cause
+ // the problem: https://github.com/rust-lang/futures-rs/pull/2333)
+ if yielded >= 2 || polled == len {
cx.waker().wake_by_ref();
return Poll::Pending;
}
diff --git a/src/stream/futures_unordered/ready_to_run_queue.rs b/src/stream/futures_unordered/ready_to_run_queue.rs
index 5ef6cde..4518705 100644
--- a/src/stream/futures_unordered/ready_to_run_queue.rs
+++ b/src/stream/futures_unordered/ready_to_run_queue.rs
@@ -83,7 +83,7 @@ impl<Fut> ReadyToRunQueue<Fut> {
}
pub(super) fn stub(&self) -> *const Task<Fut> {
- &*self.stub
+ Arc::as_ptr(&self.stub)
}
// Clear the queue of tasks.
diff --git a/src/stream/futures_unordered/task.rs b/src/stream/futures_unordered/task.rs
index da2cd67..ec2114e 100644
--- a/src/stream/futures_unordered/task.rs
+++ b/src/stream/futures_unordered/task.rs
@@ -1,6 +1,6 @@
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
-use core::sync::atomic::Ordering::{self, SeqCst};
+use core::sync::atomic::Ordering::{self, Relaxed, SeqCst};
use core::sync::atomic::{AtomicBool, AtomicPtr};
use super::abort::abort;
@@ -31,6 +31,11 @@ pub(super) struct Task<Fut> {
// Whether or not this task is currently in the ready to run queue
pub(super) queued: AtomicBool,
+
+ // Whether the future was awoken during polling
+ // It is possible for this flag to be set to true after the polling,
+ // but it will be ignored.
+ pub(super) woken: AtomicBool,
}
// `Task` can be sent across threads safely because it ensures that
@@ -48,6 +53,8 @@ impl<Fut> ArcWake for Task<Fut> {
None => return,
};
+ arc_self.woken.store(true, Relaxed);
+
// It's our job to enqueue this task it into the ready to run queue. To
// do this we set the `queued` flag, and if successful we then do the
// actual queueing operation, ensuring that we're only queued once.
@@ -62,7 +69,7 @@ impl<Fut> ArcWake for Task<Fut> {
// still.
let prev = arc_self.queued.swap(true, SeqCst);
if !prev {
- inner.enqueue(&**arc_self);
+ inner.enqueue(Arc::as_ptr(arc_self));
inner.waker.wake();
}
}
diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs
new file mode 100644
index 0000000..513cab7
--- /dev/null
+++ b/src/stream/stream/count.rs
@@ -0,0 +1,53 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`count`](super::StreamExt::count) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Count<St> {
+ #[pin]
+ stream: St,
+ count: usize
+ }
+}
+
+impl<St> fmt::Debug for Count<St>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish()
+ }
+}
+
+impl<St: Stream> Count<St> {
+ pub(super) fn new(stream: St) -> Self {
+ Self { stream, count: 0 }
+ }
+}
+
+impl<St: FusedStream> FusedFuture for Count<St> {
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+impl<St: Stream> Future for Count<St> {
+ type Output = usize;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ match ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(_) => *this.count += 1,
+ None => break *this.count,
+ }
+ })
+ }
+}
diff --git a/src/stream/stream/flatten_unordered.rs b/src/stream/stream/flatten_unordered.rs
new file mode 100644
index 0000000..07f971c
--- /dev/null
+++ b/src/stream/stream/flatten_unordered.rs
@@ -0,0 +1,509 @@
+use alloc::sync::Arc;
+use core::{
+ cell::UnsafeCell,
+ convert::identity,
+ fmt,
+ num::NonZeroUsize,
+ pin::Pin,
+ sync::atomic::{AtomicU8, Ordering},
+};
+
+use pin_project_lite::pin_project;
+
+use futures_core::{
+ future::Future,
+ ready,
+ stream::{FusedStream, Stream},
+ task::{Context, Poll, Waker},
+};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use futures_task::{waker, ArcWake};
+
+use crate::stream::FuturesUnordered;
+
+/// There is nothing to poll and stream isn't being
+/// polled or waking at the moment.
+const NONE: u8 = 0;
+
+/// Inner streams need to be polled.
+const NEED_TO_POLL_INNER_STREAMS: u8 = 1;
+
+/// The base stream needs to be polled.
+const NEED_TO_POLL_STREAM: u8 = 0b10;
+
+/// It needs to poll base stream and inner streams.
+const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM;
+
+/// The current stream is being polled at the moment.
+const POLLING: u8 = 0b100;
+
+/// Inner streams are being woken at the moment.
+const WAKING_INNER_STREAMS: u8 = 0b1000;
+
+/// The base stream is being woken at the moment.
+const WAKING_STREAM: u8 = 0b10000;
+
+/// The base stream and inner streams are being woken at the moment.
+const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS;
+
+/// The stream was waked and will be polled.
+const WOKEN: u8 = 0b100000;
+
+/// Determines what needs to be polled, and is stream being polled at the
+/// moment or not.
+#[derive(Clone, Debug)]
+struct SharedPollState {
+ state: Arc<AtomicU8>,
+}
+
+impl SharedPollState {
+ /// Constructs new `SharedPollState` with the given state.
+ fn new(value: u8) -> SharedPollState {
+ SharedPollState { state: Arc::new(AtomicU8::new(value)) }
+ }
+
+ /// Attempts to start polling, returning stored state in case of success.
+ /// Returns `None` if some waker is waking at the moment.
+ fn start_polling(
+ &self,
+ ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ let value = self
+ .state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ if value & WAKING_ALL == NONE {
+ Some(POLLING)
+ } else {
+ None
+ }
+ })
+ .ok()?;
+ let bomb = PollStateBomb::new(self, SharedPollState::reset);
+
+ Some((value, bomb))
+ }
+
+ /// Starts the waking process and performs bitwise or with the given value.
+ fn start_waking(
+ &self,
+ to_poll: u8,
+ waking: u8,
+ ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ let value = self
+ .state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ // Waking process for this waker already started
+ if value & waking != NONE {
+ return None;
+ }
+ let mut next_value = value | to_poll;
+ // Only start the waking process if we're not in the polling phase and the stream isn't woken already
+ if value & (WOKEN | POLLING) == NONE {
+ next_value |= waking;
+ }
+
+ if next_value != value {
+ Some(next_value)
+ } else {
+ None
+ }
+ })
+ .ok()?;
+
+ if value & (WOKEN | POLLING) == NONE {
+ let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking));
+
+ Some((value, bomb))
+ } else {
+ None
+ }
+ }
+
+ /// Sets current state to
+ /// - `!POLLING` allowing to use wakers
+ /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called,
+ /// or `will_be_woken` flag supplied
+ /// - `!WAKING_ALL` as
+ /// * Wakers called during the `POLLING` phase won't propagate their calls
+ /// * `POLLING` phase can't start if some of the wakers are active
+ /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again.
+ fn stop_polling(&self, to_poll: u8, will_be_woken: bool) -> u8 {
+ self.state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| {
+ let mut next_value = to_poll;
+
+ value &= NEED_TO_POLL_ALL;
+ if value != NONE || will_be_woken {
+ next_value |= WOKEN;
+ }
+ next_value |= value;
+
+ Some(next_value & !POLLING & !WAKING_ALL)
+ })
+ .unwrap()
+ }
+
+ /// Toggles state to non-waking, allowing to start polling.
+ fn stop_waking(&self, waking: u8) -> u8 {
+ self.state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ let mut next_value = value & !waking;
+ // Waker will be called only if the current waking state is the same as the specified waker state
+ if value & WAKING_ALL == waking {
+ next_value |= WOKEN;
+ }
+
+ if next_value != value {
+ Some(next_value)
+ } else {
+ None
+ }
+ })
+ .unwrap_or_else(identity)
+ }
+
+ /// Resets current state allowing to poll the stream and wake up wakers.
+ fn reset(&self) -> u8 {
+ self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel)
+ }
+}
+
+/// Used to execute some function on the given state when dropped.
+struct PollStateBomb<'a, F: FnOnce(&SharedPollState) -> u8> {
+ state: &'a SharedPollState,
+ drop: Option<F>,
+}
+
+impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> {
+ /// Constructs new bomb with the given state.
+ fn new(state: &'a SharedPollState, drop: F) -> Self {
+ Self { state, drop: Some(drop) }
+ }
+
+ /// Deactivates bomb, forces it to not call provided function when dropped.
+ fn deactivate(mut self) {
+ self.drop.take();
+ }
+
+ /// Manually fires the bomb, returning supplied state.
+ fn fire(mut self) -> Option<u8> {
+ self.drop.take().map(|drop| (drop)(self.state))
+ }
+}
+
+impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> {
+ fn drop(&mut self) {
+ if let Some(drop) = self.drop.take() {
+ (drop)(self.state);
+ }
+ }
+}
+
+/// Will update state with the provided value on `wake_by_ref` call
+/// and then, if there is a need, call `inner_waker`.
+struct InnerWaker {
+ inner_waker: UnsafeCell<Option<Waker>>,
+ poll_state: SharedPollState,
+ need_to_poll: u8,
+}
+
+unsafe impl Send for InnerWaker {}
+unsafe impl Sync for InnerWaker {}
+
+impl InnerWaker {
+ /// Replaces given waker's inner_waker for polling stream/futures which will
+ /// update poll state on `wake_by_ref` call. Use only if you need several
+ /// contexts.
+ ///
+ /// ## Safety
+ ///
+ /// This function will modify waker's `inner_waker` via `UnsafeCell`, so
+ /// it should be used only during `POLLING` phase.
+ unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) -> Waker {
+ *self_arc.inner_waker.get() = cx.waker().clone().into();
+ waker(self_arc.clone())
+ }
+
+ /// Attempts to start the waking process for the waker with the given value.
+ /// If succeeded, then the stream isn't yet woken and not being polled at the moment.
+ fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ self.poll_state.start_waking(self.need_to_poll, self.waking_state())
+ }
+
+ /// Returns the corresponding waking state toggled by this waker.
+ fn waking_state(&self) -> u8 {
+ self.need_to_poll << 3
+ }
+}
+
+impl ArcWake for InnerWaker {
+ fn wake_by_ref(self_arc: &Arc<Self>) {
+ if let Some((_, state_bomb)) = self_arc.start_waking() {
+ // Safety: now state is not `POLLING`
+ let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() };
+
+ if let Some(inner_waker) = waker_opt.clone() {
+ // Stop waking to allow polling stream
+ let poll_state_value = state_bomb.fire().unwrap();
+
+ // Here we want to call waker only if stream isn't woken yet and
+ // also to optimize the case when two wakers are called at the same time.
+ //
+ // In this case the best strategy will be to propagate only the latest waker's awake,
+ // and then poll both entities in a single `poll_next` call
+ if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() {
+ // Wake up inner waker
+ inner_waker.wake();
+ }
+ }
+ }
+ }
+}
+
+pin_project! {
+ /// Future which contains optional stream.
+ ///
+ /// If it's `Some`, it will attempt to call `poll_next` on it,
+ /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))`
+ /// or `None` in case of `Poll::Ready(None)`.
+ ///
+ /// If `poll_next` will return `Poll::Pending`, it will be forwarded to
+ /// the future and current task will be notified by waker.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ struct PollStreamFut<St> {
+ #[pin]
+ stream: Option<St>,
+ }
+}
+
+impl<St> PollStreamFut<St> {
+ /// Constructs new `PollStreamFut` using given `stream`.
+ fn new(stream: impl Into<Option<St>>) -> Self {
+ Self { stream: stream.into() }
+ }
+}
+
+impl<St: Stream + Unpin> Future for PollStreamFut<St> {
+ type Output = Option<(St::Item, PollStreamFut<St>)>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut stream = self.project().stream;
+
+ let item = if let Some(stream) = stream.as_mut().as_pin_mut() {
+ ready!(stream.poll_next(cx))
+ } else {
+ None
+ };
+ let next_item_fut = PollStreamFut::new(stream.get_mut().take());
+ let out = item.map(|item| (item, next_item_fut));
+
+ Poll::Ready(out)
+ }
+}
+
+pin_project! {
+ /// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
+ /// method.
+ #[project = FlattenUnorderedProj]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct FlattenUnordered<St> where St: Stream {
+ #[pin]
+ inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
+ #[pin]
+ stream: St,
+ poll_state: SharedPollState,
+ limit: Option<NonZeroUsize>,
+ is_stream_done: bool,
+ inner_streams_waker: Arc<InnerWaker>,
+ stream_waker: Arc<InnerWaker>,
+ }
+}
+
+impl<St> fmt::Debug for FlattenUnordered<St>
+where
+ St: Stream + fmt::Debug,
+ St::Item: Stream + fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FlattenUnordered")
+ .field("poll_state", &self.poll_state)
+ .field("inner_streams", &self.inner_streams)
+ .field("limit", &self.limit)
+ .field("stream", &self.stream)
+ .field("is_stream_done", &self.is_stream_done)
+ .finish()
+ }
+}
+
+impl<St> FlattenUnordered<St>
+where
+ St: Stream,
+ St::Item: Stream + Unpin,
+{
+ pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> {
+ let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);
+
+ FlattenUnordered {
+ inner_streams: FuturesUnordered::new(),
+ stream,
+ is_stream_done: false,
+ limit: limit.and_then(NonZeroUsize::new),
+ inner_streams_waker: Arc::new(InnerWaker {
+ inner_waker: UnsafeCell::new(None),
+ poll_state: poll_state.clone(),
+ need_to_poll: NEED_TO_POLL_INNER_STREAMS,
+ }),
+ stream_waker: Arc::new(InnerWaker {
+ inner_waker: UnsafeCell::new(None),
+ poll_state: poll_state.clone(),
+ need_to_poll: NEED_TO_POLL_STREAM,
+ }),
+ poll_state,
+ }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+impl<St> FlattenUnorderedProj<'_, St>
+where
+ St: Stream,
+{
+ /// Checks if current `inner_streams` size is less than optional limit.
+ fn is_exceeded_limit(&self) -> bool {
+ self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get())
+ }
+}
+
+impl<St> FusedStream for FlattenUnordered<St>
+where
+ St: FusedStream,
+ St::Item: FusedStream + Unpin,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated() && self.inner_streams.is_empty()
+ }
+}
+
+impl<St> Stream for FlattenUnordered<St>
+where
+ St: Stream,
+ St::Item: Stream + Unpin,
+{
+ type Item = <St::Item as Stream>::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut next_item = None;
+ let mut need_to_poll_next = NONE;
+
+ let mut this = self.as_mut().project();
+
+ let (mut poll_state_value, state_bomb) = match this.poll_state.start_polling() {
+ Some(value) => value,
+ _ => {
+ // Waker was called, just wait for the next poll
+ return Poll::Pending;
+ }
+ };
+
+ if poll_state_value & NEED_TO_POLL_STREAM != NONE {
+ // Safety: now state is `POLLING`.
+ let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) };
+
+ // Here we need to poll the base stream.
+ //
+ // To improve performance, we will attempt to place as many items as we can
+ // to the `FuturesUnordered` bucket before polling inner streams
+ loop {
+ if this.is_exceeded_limit() || *this.is_stream_done {
+ // We either exceeded the limit or the stream is exhausted
+ if !*this.is_stream_done {
+ // The stream needs to be polled in the next iteration
+ need_to_poll_next |= NEED_TO_POLL_STREAM;
+ }
+
+ break;
+ } else {
+ match this.stream.as_mut().poll_next(&mut Context::from_waker(&stream_waker)) {
+ Poll::Ready(Some(inner_stream)) => {
+ // Add new stream to the inner streams bucket
+ this.inner_streams.as_mut().push(PollStreamFut::new(inner_stream));
+ // Inner streams must be polled afterward
+ poll_state_value |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ Poll::Ready(None) => {
+ // Mark the stream as done
+ *this.is_stream_done = true;
+ }
+ Poll::Pending => {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE {
+ // Safety: now state is `POLLING`.
+ let inner_streams_waker =
+ unsafe { InnerWaker::replace_waker(this.inner_streams_waker, cx) };
+
+ match this
+ .inner_streams
+ .as_mut()
+ .poll_next(&mut Context::from_waker(&inner_streams_waker))
+ {
+ Poll::Ready(Some(Some((item, next_item_fut)))) => {
+ // Push next inner stream item future to the list of inner streams futures
+ this.inner_streams.as_mut().push(next_item_fut);
+ // Take the received item
+ next_item = Some(item);
+ // On the next iteration, inner streams must be polled again
+ need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ Poll::Ready(Some(None)) => {
+ // On the next iteration, inner streams must be polled again
+ need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ _ => {}
+ }
+ }
+
+ // We didn't have any `poll_next` panic, so it's time to deactivate the bomb
+ state_bomb.deactivate();
+
+ let mut force_wake =
+ // we need to poll the stream and didn't reach the limit yet
+ need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit()
+ // or we need to poll inner streams again
+ || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE;
+
+ // Stop polling and swap the latest state
+ poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake);
+ // If state was changed during `POLLING` phase, need to manually call a waker
+ force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE;
+
+ let is_done = *this.is_stream_done && this.inner_streams.is_empty();
+
+ if next_item.is_some() || is_done {
+ Poll::Ready(next_item)
+ } else {
+ if force_wake {
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ }
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<St, Item> Sink<Item> for FlattenUnordered<St>
+where
+ St: Stream + Sink<Item>,
+{
+ type Error = St::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index 86997f4..642b91e 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -40,6 +40,10 @@ mod concat;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::concat::Concat;
+mod count;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::count::Count;
+
mod cycle;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::cycle::Cycle;
@@ -195,6 +199,25 @@ pub use self::buffered::Buffered;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
+mod flatten_unordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)]
+pub use self::flatten_unordered::FlattenUnordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+delegate_all!(
+ /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method.
+ FlatMapUnordered<St, U, F>(
+ FlattenUnordered<Map<St, F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)]
+ where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
+);
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
mod for_each_concurrent;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
@@ -386,9 +409,9 @@ pub trait StreamExt: Stream {
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=10);
- /// let evens = stream.filter(|x| future::ready(x % 2 == 0));
+ /// let events = stream.filter(|x| future::ready(x % 2 == 0));
///
- /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await);
+ /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await);
/// # });
/// ```
fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
@@ -418,11 +441,11 @@ pub trait StreamExt: Stream {
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=10);
- /// let evens = stream.filter_map(|x| async move {
+ /// let events = stream.filter_map(|x| async move {
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });
///
- /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await);
+ /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await);
/// # });
/// ```
fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
@@ -576,6 +599,38 @@ pub trait StreamExt: Stream {
assert_future::<Self::Item, _>(Concat::new(self))
}
+ /// Drives the stream to completion, counting the number of items.
+ ///
+ /// # Overflow Behavior
+ ///
+ /// The method does no guarding against overflows, so counting elements of a
+ /// stream with more than [`usize::MAX`] elements either produces the wrong
+ /// result or panics. If debug assertions are enabled, a panic is guaranteed.
+ ///
+ /// # Panics
+ ///
+ /// This function might panic if the iterator has more than [`usize::MAX`]
+ /// elements.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=10);
+ /// let count = stream.count().await;
+ ///
+ /// assert_eq!(count, 10);
+ /// # });
+ /// ```
+ fn count(self) -> Count<Self>
+ where
+ Self: Sized,
+ {
+ assert_future::<usize, _>(Count::new(self))
+ }
+
/// Repeats a stream endlessly.
///
/// The stream never terminates. Note that you likely want to avoid
@@ -718,13 +773,57 @@ pub trait StreamExt: Stream {
assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
}
+ /// Flattens a stream of streams into just one continuous stream. Polls
+ /// inner streams concurrently.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::StreamExt;
+ /// use std::thread;
+ ///
+ /// let (tx1, rx1) = mpsc::unbounded();
+ /// let (tx2, rx2) = mpsc::unbounded();
+ /// let (tx3, rx3) = mpsc::unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// tx1.unbounded_send(1).unwrap();
+ /// tx1.unbounded_send(2).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx2.unbounded_send(3).unwrap();
+ /// tx2.unbounded_send(4).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx3.unbounded_send(rx1).unwrap();
+ /// tx3.unbounded_send(rx2).unwrap();
+ /// });
+ ///
+ /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await;
+ /// output.sort();
+ ///
+ /// assert_eq!(output, vec![1, 2, 3, 4]);
+ /// # });
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
+ where
+ Self::Item: Stream + Unpin,
+ Self: Sized,
+ {
+ FlattenUnordered::new(self, limit.into())
+ }
+
/// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
///
/// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
/// you would have to chain combinators like `.map(f).flatten()` while this
/// combinator provides ability to write `.flat_map(f)` instead of chaining.
///
- /// The provided closure which produce inner streams is executed over all elements
+ /// The provided closure which produces inner streams is executed over all elements
/// of stream as last inner stream is terminated and next stream item is available.
///
/// Note that this function consumes the stream passed into it and returns a
@@ -752,6 +851,59 @@ pub trait StreamExt: Stream {
assert_stream::<U::Item, _>(FlatMap::new(self, f))
}
+ /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s
+ /// and polls them concurrently, yielding items in any order, as they made
+ /// available.
+ ///
+ /// [`StreamExt::map`] is very useful, but if it produces `Stream`s
+ /// instead, and you need to poll all of them concurrently, you would
+ /// have to use something like `for_each_concurrent` and merge values
+ /// by hand. This combinator provides ability to collect all values
+ /// from concurrently polled streams into one stream.
+ ///
+ /// The first argument is an optional limit on the number of concurrently
+ /// polled streams. If this limit is not `None`, no more than `limit` streams
+ /// will be polled concurrently. The `limit` argument is of type
+ /// `Into<Option<usize>>`, and so can be provided as either `None`,
+ /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+ /// no limit at all, and will have the same result as passing in `None`.
+ ///
+ /// The provided closure which produces inner streams is executed over
+ /// all elements of stream as next stream item is available and limit
+ /// of concurrently processed streams isn't exceeded.
+ ///
+ /// Note that this function consumes the stream passed into it and
+ /// returns a wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(1..5);
+ /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x]));
+ /// let mut values = stream.collect::<Vec<_>>().await;
+ /// values.sort();
+ ///
+ /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values);
+ /// # });
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn flat_map_unordered<U, F>(
+ self,
+ limit: impl Into<Option<usize>>,
+ f: F,
+ ) -> FlatMapUnordered<Self, U, F>
+ where
+ U: Stream + Unpin,
+ F: FnMut(Self::Item) -> U,
+ Self: Sized,
+ {
+ FlatMapUnordered::new(self, limit.into(), f)
+ }
+
/// Combinator similar to [`StreamExt::fold`] that holds internal state
/// and produces a new stream.
///
diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs
index 8724145..f5cfde9 100644
--- a/src/stream/stream/scan.rs
+++ b/src/stream/stream/scan.rs
@@ -118,11 +118,11 @@ where
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
-impl<S, Fut, F, Item> Sink<Item> for Scan<S, S, Fut, F>
+impl<St, S, Fut, F, Item> Sink<Item> for Scan<St, S, Fut, F>
where
- S: Stream + Sink<Item>,
+ St: Stream + Sink<Item>,
{
- type Error = S::Error;
+ type Error = St::Error;
delegate_sink!(stream, Item);
}
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 455ddca..6bf2cb7 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -736,17 +736,21 @@ pub trait TryStreamExt: TryStream {
/// thread::spawn(move || {
/// tx2.unbounded_send(Ok(2)).unwrap();
/// tx2.unbounded_send(Err(3)).unwrap();
+ /// tx2.unbounded_send(Ok(4)).unwrap();
/// });
/// thread::spawn(move || {
/// tx3.unbounded_send(Ok(rx1)).unwrap();
/// tx3.unbounded_send(Ok(rx2)).unwrap();
- /// tx3.unbounded_send(Err(4)).unwrap();
+ /// tx3.unbounded_send(Err(5)).unwrap();
/// });
///
/// let mut stream = rx3.try_flatten();
/// assert_eq!(stream.next().await, Some(Ok(1)));
/// assert_eq!(stream.next().await, Some(Ok(2)));
/// assert_eq!(stream.next().await, Some(Err(3)));
+ /// assert_eq!(stream.next().await, Some(Ok(4)));
+ /// assert_eq!(stream.next().await, Some(Err(5)));
+ /// assert_eq!(stream.next().await, None);
/// # });
/// ```
fn try_flatten(self) -> TryFlatten<Self>
@@ -1001,6 +1005,7 @@ pub trait TryStreamExt: TryStream {
/// Wraps a [`TryStream`] into a stream compatible with libraries using
/// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
/// ```
+ /// # if cfg!(miri) { return; } // Miri does not support epoll
/// use futures::future::{FutureExt, TryFutureExt};
/// # let (tx, rx) = futures::channel::oneshot::channel();
///