diff options
Diffstat (limited to 'src/stream')
-rw-r--r-- | src/stream/futures_unordered/mod.rs | 42 | ||||
-rw-r--r-- | src/stream/futures_unordered/ready_to_run_queue.rs | 2 | ||||
-rw-r--r-- | src/stream/futures_unordered/task.rs | 11 | ||||
-rw-r--r-- | src/stream/stream/count.rs | 53 | ||||
-rw-r--r-- | src/stream/stream/flatten_unordered.rs | 509 | ||||
-rw-r--r-- | src/stream/stream/mod.rs | 162 | ||||
-rw-r--r-- | src/stream/stream/scan.rs | 6 | ||||
-rw-r--r-- | src/stream/try_stream/mod.rs | 7 |
8 files changed, 758 insertions, 34 deletions
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index 4a05d88..fdbd53d 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs @@ -121,8 +121,9 @@ impl<Fut> FuturesUnordered<Fut> { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Weak::new(), + woken: AtomicBool::new(false), }); - let stub_ptr = &*stub as *const Task<Fut>; + let stub_ptr = Arc::as_ptr(&stub); let ready_to_run_queue = Arc::new(ReadyToRunQueue { waker: AtomicWaker::new(), head: AtomicPtr::new(stub_ptr as *mut _), @@ -167,6 +168,7 @@ impl<Fut> FuturesUnordered<Fut> { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue), + woken: AtomicBool::new(false), }); // Reset the `is_terminated` flag if we've previously marked ourselves @@ -375,7 +377,7 @@ impl<Fut> FuturesUnordered<Fut> { // The `ReadyToRunQueue` stub is never inserted into the `head_all` // list, and its pointer value will remain valid for the lifetime of // this `FuturesUnordered`, so we can make use of its value here. - &*self.ready_to_run_queue.stub as *const _ as *mut _ + Arc::as_ptr(&self.ready_to_run_queue.stub) as *mut _ } } @@ -383,25 +385,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { type Item = Fut::Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - // Variable to determine how many times it is allowed to poll underlying - // futures without yielding. - // - // A single call to `poll_next` may potentially do a lot of work before - // yielding. This happens in particular if the underlying futures are awoken - // frequently but continue to return `Pending`. This is problematic if other - // tasks are waiting on the executor, since they do not get to run. This value - // caps the number of calls to `poll` on underlying futures a single call to - // `poll_next` is allowed to make. - // - // The value is the length of FuturesUnordered. This ensures that each - // future is polled only once at most per iteration. - // - // See also https://github.com/rust-lang/futures-rs/issues/2047. - let yield_every = self.len(); + let len = self.len(); // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; + let mut yielded = 0; // Ensure `parent` is correctly set. self.ready_to_run_queue.waker.register(cx.waker()); @@ -512,7 +501,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { // the internal allocation, appropriately accessing fields and // deallocating the task if need be. let res = { - let waker = Task::waker_ref(bomb.task.as_ref().unwrap()); + let task = bomb.task.as_ref().unwrap(); + // We are only interested in whether the future is awoken before it + // finishes polling, so reset the flag here. + task.woken.store(false, Relaxed); + let waker = Task::waker_ref(task); let mut cx = Context::from_waker(&waker); // Safety: We won't move the future ever again @@ -525,12 +518,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { match res { Poll::Pending => { let task = bomb.task.take().unwrap(); + // If the future was awoken during polling, we assume + // the future wanted to explicitly yield. + yielded += task.woken.load(Relaxed) as usize; bomb.queue.link(task); - if polled == yield_every { - // We have polled a large number of futures in a row without yielding. - // To ensure we do not starve other tasks waiting on the executor, - // we yield here, but immediately wake ourselves up to continue. + // If a future yields, we respect it and yield here. + // If all futures have been polled, we also yield here to + // avoid starving other tasks waiting on the executor. + // (polling the same future twice per iteration may cause + // the problem: https://github.com/rust-lang/futures-rs/pull/2333) + if yielded >= 2 || polled == len { cx.waker().wake_by_ref(); return Poll::Pending; } diff --git a/src/stream/futures_unordered/ready_to_run_queue.rs b/src/stream/futures_unordered/ready_to_run_queue.rs index 5ef6cde..4518705 100644 --- a/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/src/stream/futures_unordered/ready_to_run_queue.rs @@ -83,7 +83,7 @@ impl<Fut> ReadyToRunQueue<Fut> { } pub(super) fn stub(&self) -> *const Task<Fut> { - &*self.stub + Arc::as_ptr(&self.stub) } // Clear the queue of tasks. diff --git a/src/stream/futures_unordered/task.rs b/src/stream/futures_unordered/task.rs index da2cd67..ec2114e 100644 --- a/src/stream/futures_unordered/task.rs +++ b/src/stream/futures_unordered/task.rs @@ -1,6 +1,6 @@ use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; -use core::sync::atomic::Ordering::{self, SeqCst}; +use core::sync::atomic::Ordering::{self, Relaxed, SeqCst}; use core::sync::atomic::{AtomicBool, AtomicPtr}; use super::abort::abort; @@ -31,6 +31,11 @@ pub(super) struct Task<Fut> { // Whether or not this task is currently in the ready to run queue pub(super) queued: AtomicBool, + + // Whether the future was awoken during polling + // It is possible for this flag to be set to true after the polling, + // but it will be ignored. + pub(super) woken: AtomicBool, } // `Task` can be sent across threads safely because it ensures that @@ -48,6 +53,8 @@ impl<Fut> ArcWake for Task<Fut> { None => return, }; + arc_self.woken.store(true, Relaxed); + // It's our job to enqueue this task it into the ready to run queue. To // do this we set the `queued` flag, and if successful we then do the // actual queueing operation, ensuring that we're only queued once. @@ -62,7 +69,7 @@ impl<Fut> ArcWake for Task<Fut> { // still. let prev = arc_self.queued.swap(true, SeqCst); if !prev { - inner.enqueue(&**arc_self); + inner.enqueue(Arc::as_ptr(arc_self)); inner.waker.wake(); } } diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs new file mode 100644 index 0000000..513cab7 --- /dev/null +++ b/src/stream/stream/count.rs @@ -0,0 +1,53 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`count`](super::StreamExt::count) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Count<St> { + #[pin] + stream: St, + count: usize + } +} + +impl<St> fmt::Debug for Count<St> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish() + } +} + +impl<St: Stream> Count<St> { + pub(super) fn new(stream: St) -> Self { + Self { stream, count: 0 } + } +} + +impl<St: FusedStream> FusedFuture for Count<St> { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +impl<St: Stream> Future for Count<St> { + type Output = usize; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + Poll::Ready(loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(_) => *this.count += 1, + None => break *this.count, + } + }) + } +} diff --git a/src/stream/stream/flatten_unordered.rs b/src/stream/stream/flatten_unordered.rs new file mode 100644 index 0000000..07f971c --- /dev/null +++ b/src/stream/stream/flatten_unordered.rs @@ -0,0 +1,509 @@ +use alloc::sync::Arc; +use core::{ + cell::UnsafeCell, + convert::identity, + fmt, + num::NonZeroUsize, + pin::Pin, + sync::atomic::{AtomicU8, Ordering}, +}; + +use pin_project_lite::pin_project; + +use futures_core::{ + future::Future, + ready, + stream::{FusedStream, Stream}, + task::{Context, Poll, Waker}, +}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use futures_task::{waker, ArcWake}; + +use crate::stream::FuturesUnordered; + +/// There is nothing to poll and stream isn't being +/// polled or waking at the moment. +const NONE: u8 = 0; + +/// Inner streams need to be polled. +const NEED_TO_POLL_INNER_STREAMS: u8 = 1; + +/// The base stream needs to be polled. +const NEED_TO_POLL_STREAM: u8 = 0b10; + +/// It needs to poll base stream and inner streams. +const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM; + +/// The current stream is being polled at the moment. +const POLLING: u8 = 0b100; + +/// Inner streams are being woken at the moment. +const WAKING_INNER_STREAMS: u8 = 0b1000; + +/// The base stream is being woken at the moment. +const WAKING_STREAM: u8 = 0b10000; + +/// The base stream and inner streams are being woken at the moment. +const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS; + +/// The stream was waked and will be polled. +const WOKEN: u8 = 0b100000; + +/// Determines what needs to be polled, and is stream being polled at the +/// moment or not. +#[derive(Clone, Debug)] +struct SharedPollState { + state: Arc<AtomicU8>, +} + +impl SharedPollState { + /// Constructs new `SharedPollState` with the given state. + fn new(value: u8) -> SharedPollState { + SharedPollState { state: Arc::new(AtomicU8::new(value)) } + } + + /// Attempts to start polling, returning stored state in case of success. + /// Returns `None` if some waker is waking at the moment. + fn start_polling( + &self, + ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + let value = self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + if value & WAKING_ALL == NONE { + Some(POLLING) + } else { + None + } + }) + .ok()?; + let bomb = PollStateBomb::new(self, SharedPollState::reset); + + Some((value, bomb)) + } + + /// Starts the waking process and performs bitwise or with the given value. + fn start_waking( + &self, + to_poll: u8, + waking: u8, + ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + let value = self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + // Waking process for this waker already started + if value & waking != NONE { + return None; + } + let mut next_value = value | to_poll; + // Only start the waking process if we're not in the polling phase and the stream isn't woken already + if value & (WOKEN | POLLING) == NONE { + next_value |= waking; + } + + if next_value != value { + Some(next_value) + } else { + None + } + }) + .ok()?; + + if value & (WOKEN | POLLING) == NONE { + let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking)); + + Some((value, bomb)) + } else { + None + } + } + + /// Sets current state to + /// - `!POLLING` allowing to use wakers + /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called, + /// or `will_be_woken` flag supplied + /// - `!WAKING_ALL` as + /// * Wakers called during the `POLLING` phase won't propagate their calls + /// * `POLLING` phase can't start if some of the wakers are active + /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again. + fn stop_polling(&self, to_poll: u8, will_be_woken: bool) -> u8 { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| { + let mut next_value = to_poll; + + value &= NEED_TO_POLL_ALL; + if value != NONE || will_be_woken { + next_value |= WOKEN; + } + next_value |= value; + + Some(next_value & !POLLING & !WAKING_ALL) + }) + .unwrap() + } + + /// Toggles state to non-waking, allowing to start polling. + fn stop_waking(&self, waking: u8) -> u8 { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + let mut next_value = value & !waking; + // Waker will be called only if the current waking state is the same as the specified waker state + if value & WAKING_ALL == waking { + next_value |= WOKEN; + } + + if next_value != value { + Some(next_value) + } else { + None + } + }) + .unwrap_or_else(identity) + } + + /// Resets current state allowing to poll the stream and wake up wakers. + fn reset(&self) -> u8 { + self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel) + } +} + +/// Used to execute some function on the given state when dropped. +struct PollStateBomb<'a, F: FnOnce(&SharedPollState) -> u8> { + state: &'a SharedPollState, + drop: Option<F>, +} + +impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> { + /// Constructs new bomb with the given state. + fn new(state: &'a SharedPollState, drop: F) -> Self { + Self { state, drop: Some(drop) } + } + + /// Deactivates bomb, forces it to not call provided function when dropped. + fn deactivate(mut self) { + self.drop.take(); + } + + /// Manually fires the bomb, returning supplied state. + fn fire(mut self) -> Option<u8> { + self.drop.take().map(|drop| (drop)(self.state)) + } +} + +impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> { + fn drop(&mut self) { + if let Some(drop) = self.drop.take() { + (drop)(self.state); + } + } +} + +/// Will update state with the provided value on `wake_by_ref` call +/// and then, if there is a need, call `inner_waker`. +struct InnerWaker { + inner_waker: UnsafeCell<Option<Waker>>, + poll_state: SharedPollState, + need_to_poll: u8, +} + +unsafe impl Send for InnerWaker {} +unsafe impl Sync for InnerWaker {} + +impl InnerWaker { + /// Replaces given waker's inner_waker for polling stream/futures which will + /// update poll state on `wake_by_ref` call. Use only if you need several + /// contexts. + /// + /// ## Safety + /// + /// This function will modify waker's `inner_waker` via `UnsafeCell`, so + /// it should be used only during `POLLING` phase. + unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) -> Waker { + *self_arc.inner_waker.get() = cx.waker().clone().into(); + waker(self_arc.clone()) + } + + /// Attempts to start the waking process for the waker with the given value. + /// If succeeded, then the stream isn't yet woken and not being polled at the moment. + fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + self.poll_state.start_waking(self.need_to_poll, self.waking_state()) + } + + /// Returns the corresponding waking state toggled by this waker. + fn waking_state(&self) -> u8 { + self.need_to_poll << 3 + } +} + +impl ArcWake for InnerWaker { + fn wake_by_ref(self_arc: &Arc<Self>) { + if let Some((_, state_bomb)) = self_arc.start_waking() { + // Safety: now state is not `POLLING` + let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() }; + + if let Some(inner_waker) = waker_opt.clone() { + // Stop waking to allow polling stream + let poll_state_value = state_bomb.fire().unwrap(); + + // Here we want to call waker only if stream isn't woken yet and + // also to optimize the case when two wakers are called at the same time. + // + // In this case the best strategy will be to propagate only the latest waker's awake, + // and then poll both entities in a single `poll_next` call + if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() { + // Wake up inner waker + inner_waker.wake(); + } + } + } + } +} + +pin_project! { + /// Future which contains optional stream. + /// + /// If it's `Some`, it will attempt to call `poll_next` on it, + /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))` + /// or `None` in case of `Poll::Ready(None)`. + /// + /// If `poll_next` will return `Poll::Pending`, it will be forwarded to + /// the future and current task will be notified by waker. + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct PollStreamFut<St> { + #[pin] + stream: Option<St>, + } +} + +impl<St> PollStreamFut<St> { + /// Constructs new `PollStreamFut` using given `stream`. + fn new(stream: impl Into<Option<St>>) -> Self { + Self { stream: stream.into() } + } +} + +impl<St: Stream + Unpin> Future for PollStreamFut<St> { + type Output = Option<(St::Item, PollStreamFut<St>)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut stream = self.project().stream; + + let item = if let Some(stream) = stream.as_mut().as_pin_mut() { + ready!(stream.poll_next(cx)) + } else { + None + }; + let next_item_fut = PollStreamFut::new(stream.get_mut().take()); + let out = item.map(|item| (item, next_item_fut)); + + Poll::Ready(out) + } +} + +pin_project! { + /// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered) + /// method. + #[project = FlattenUnorderedProj] + #[must_use = "streams do nothing unless polled"] + pub struct FlattenUnordered<St> where St: Stream { + #[pin] + inner_streams: FuturesUnordered<PollStreamFut<St::Item>>, + #[pin] + stream: St, + poll_state: SharedPollState, + limit: Option<NonZeroUsize>, + is_stream_done: bool, + inner_streams_waker: Arc<InnerWaker>, + stream_waker: Arc<InnerWaker>, + } +} + +impl<St> fmt::Debug for FlattenUnordered<St> +where + St: Stream + fmt::Debug, + St::Item: Stream + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlattenUnordered") + .field("poll_state", &self.poll_state) + .field("inner_streams", &self.inner_streams) + .field("limit", &self.limit) + .field("stream", &self.stream) + .field("is_stream_done", &self.is_stream_done) + .finish() + } +} + +impl<St> FlattenUnordered<St> +where + St: Stream, + St::Item: Stream + Unpin, +{ + pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> { + let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM); + + FlattenUnordered { + inner_streams: FuturesUnordered::new(), + stream, + is_stream_done: false, + limit: limit.and_then(NonZeroUsize::new), + inner_streams_waker: Arc::new(InnerWaker { + inner_waker: UnsafeCell::new(None), + poll_state: poll_state.clone(), + need_to_poll: NEED_TO_POLL_INNER_STREAMS, + }), + stream_waker: Arc::new(InnerWaker { + inner_waker: UnsafeCell::new(None), + poll_state: poll_state.clone(), + need_to_poll: NEED_TO_POLL_STREAM, + }), + poll_state, + } + } + + delegate_access_inner!(stream, St, ()); +} + +impl<St> FlattenUnorderedProj<'_, St> +where + St: Stream, +{ + /// Checks if current `inner_streams` size is less than optional limit. + fn is_exceeded_limit(&self) -> bool { + self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get()) + } +} + +impl<St> FusedStream for FlattenUnordered<St> +where + St: FusedStream, + St::Item: FusedStream + Unpin, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() && self.inner_streams.is_empty() + } +} + +impl<St> Stream for FlattenUnordered<St> +where + St: Stream, + St::Item: Stream + Unpin, +{ + type Item = <St::Item as Stream>::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut next_item = None; + let mut need_to_poll_next = NONE; + + let mut this = self.as_mut().project(); + + let (mut poll_state_value, state_bomb) = match this.poll_state.start_polling() { + Some(value) => value, + _ => { + // Waker was called, just wait for the next poll + return Poll::Pending; + } + }; + + if poll_state_value & NEED_TO_POLL_STREAM != NONE { + // Safety: now state is `POLLING`. + let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) }; + + // Here we need to poll the base stream. + // + // To improve performance, we will attempt to place as many items as we can + // to the `FuturesUnordered` bucket before polling inner streams + loop { + if this.is_exceeded_limit() || *this.is_stream_done { + // We either exceeded the limit or the stream is exhausted + if !*this.is_stream_done { + // The stream needs to be polled in the next iteration + need_to_poll_next |= NEED_TO_POLL_STREAM; + } + + break; + } else { + match this.stream.as_mut().poll_next(&mut Context::from_waker(&stream_waker)) { + Poll::Ready(Some(inner_stream)) => { + // Add new stream to the inner streams bucket + this.inner_streams.as_mut().push(PollStreamFut::new(inner_stream)); + // Inner streams must be polled afterward + poll_state_value |= NEED_TO_POLL_INNER_STREAMS; + } + Poll::Ready(None) => { + // Mark the stream as done + *this.is_stream_done = true; + } + Poll::Pending => { + break; + } + } + } + } + } + + if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE { + // Safety: now state is `POLLING`. + let inner_streams_waker = + unsafe { InnerWaker::replace_waker(this.inner_streams_waker, cx) }; + + match this + .inner_streams + .as_mut() + .poll_next(&mut Context::from_waker(&inner_streams_waker)) + { + Poll::Ready(Some(Some((item, next_item_fut)))) => { + // Push next inner stream item future to the list of inner streams futures + this.inner_streams.as_mut().push(next_item_fut); + // Take the received item + next_item = Some(item); + // On the next iteration, inner streams must be polled again + need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS; + } + Poll::Ready(Some(None)) => { + // On the next iteration, inner streams must be polled again + need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS; + } + _ => {} + } + } + + // We didn't have any `poll_next` panic, so it's time to deactivate the bomb + state_bomb.deactivate(); + + let mut force_wake = + // we need to poll the stream and didn't reach the limit yet + need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit() + // or we need to poll inner streams again + || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE; + + // Stop polling and swap the latest state + poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake); + // If state was changed during `POLLING` phase, need to manually call a waker + force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE; + + let is_done = *this.is_stream_done && this.inner_streams.is_empty(); + + if next_item.is_some() || is_done { + Poll::Ready(next_item) + } else { + if force_wake { + cx.waker().wake_by_ref(); + } + + Poll::Pending + } + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<St, Item> Sink<Item> for FlattenUnordered<St> +where + St: Stream + Sink<Item>, +{ + type Error = St::Error; + + delegate_sink!(stream, Item); +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 86997f4..642b91e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -40,6 +40,10 @@ mod concat; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::concat::Concat; +mod count; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::count::Count; + mod cycle; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::cycle::Cycle; @@ -195,6 +199,25 @@ pub use self::buffered::Buffered; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] +mod flatten_unordered; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] +pub use self::flatten_unordered::FlattenUnordered; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +delegate_all!( + /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method. + FlatMapUnordered<St, U, F>( + FlattenUnordered<Map<St, F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)] + where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U +); + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] mod for_each_concurrent; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] @@ -386,9 +409,9 @@ pub trait StreamExt: Stream { /// use futures::stream::{self, StreamExt}; /// /// let stream = stream::iter(1..=10); - /// let evens = stream.filter(|x| future::ready(x % 2 == 0)); + /// let events = stream.filter(|x| future::ready(x % 2 == 0)); /// - /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await); + /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await); /// # }); /// ``` fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> @@ -418,11 +441,11 @@ pub trait StreamExt: Stream { /// use futures::stream::{self, StreamExt}; /// /// let stream = stream::iter(1..=10); - /// let evens = stream.filter_map(|x| async move { + /// let events = stream.filter_map(|x| async move { /// if x % 2 == 0 { Some(x + 1) } else { None } /// }); /// - /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await); + /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await); /// # }); /// ``` fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> @@ -576,6 +599,38 @@ pub trait StreamExt: Stream { assert_future::<Self::Item, _>(Concat::new(self)) } + /// Drives the stream to completion, counting the number of items. + /// + /// # Overflow Behavior + /// + /// The method does no guarding against overflows, so counting elements of a + /// stream with more than [`usize::MAX`] elements either produces the wrong + /// result or panics. If debug assertions are enabled, a panic is guaranteed. + /// + /// # Panics + /// + /// This function might panic if the iterator has more than [`usize::MAX`] + /// elements. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=10); + /// let count = stream.count().await; + /// + /// assert_eq!(count, 10); + /// # }); + /// ``` + fn count(self) -> Count<Self> + where + Self: Sized, + { + assert_future::<usize, _>(Count::new(self)) + } + /// Repeats a stream endlessly. /// /// The stream never terminates. Note that you likely want to avoid @@ -718,13 +773,57 @@ pub trait StreamExt: Stream { assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self)) } + /// Flattens a stream of streams into just one continuous stream. Polls + /// inner streams concurrently. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::StreamExt; + /// use std::thread; + /// + /// let (tx1, rx1) = mpsc::unbounded(); + /// let (tx2, rx2) = mpsc::unbounded(); + /// let (tx3, rx3) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx1.unbounded_send(1).unwrap(); + /// tx1.unbounded_send(2).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx2.unbounded_send(3).unwrap(); + /// tx2.unbounded_send(4).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx3.unbounded_send(rx1).unwrap(); + /// tx3.unbounded_send(rx2).unwrap(); + /// }); + /// + /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await; + /// output.sort(); + /// + /// assert_eq!(output, vec![1, 2, 3, 4]); + /// # }); + /// ``` + #[cfg(not(futures_no_atomic_cas))] + #[cfg(feature = "alloc")] + fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self> + where + Self::Item: Stream + Unpin, + Self: Sized, + { + FlattenUnordered::new(self, limit.into()) + } + /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. /// /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead, /// you would have to chain combinators like `.map(f).flatten()` while this /// combinator provides ability to write `.flat_map(f)` instead of chaining. /// - /// The provided closure which produce inner streams is executed over all elements + /// The provided closure which produces inner streams is executed over all elements /// of stream as last inner stream is terminated and next stream item is available. /// /// Note that this function consumes the stream passed into it and returns a @@ -752,6 +851,59 @@ pub trait StreamExt: Stream { assert_stream::<U::Item, _>(FlatMap::new(self, f)) } + /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s + /// and polls them concurrently, yielding items in any order, as they made + /// available. + /// + /// [`StreamExt::map`] is very useful, but if it produces `Stream`s + /// instead, and you need to poll all of them concurrently, you would + /// have to use something like `for_each_concurrent` and merge values + /// by hand. This combinator provides ability to collect all values + /// from concurrently polled streams into one stream. + /// + /// The first argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled concurrently. The `limit` argument is of type + /// `Into<Option<usize>>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. + /// + /// The provided closure which produces inner streams is executed over + /// all elements of stream as next stream item is available and limit + /// of concurrently processed streams isn't exceeded. + /// + /// Note that this function consumes the stream passed into it and + /// returns a wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..5); + /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x])); + /// let mut values = stream.collect::<Vec<_>>().await; + /// values.sort(); + /// + /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values); + /// # }); + /// ``` + #[cfg(not(futures_no_atomic_cas))] + #[cfg(feature = "alloc")] + fn flat_map_unordered<U, F>( + self, + limit: impl Into<Option<usize>>, + f: F, + ) -> FlatMapUnordered<Self, U, F> + where + U: Stream + Unpin, + F: FnMut(Self::Item) -> U, + Self: Sized, + { + FlatMapUnordered::new(self, limit.into(), f) + } + /// Combinator similar to [`StreamExt::fold`] that holds internal state /// and produces a new stream. /// diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index 8724145..f5cfde9 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -118,11 +118,11 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl<S, Fut, F, Item> Sink<Item> for Scan<S, S, Fut, F> +impl<St, S, Fut, F, Item> Sink<Item> for Scan<St, S, Fut, F> where - S: Stream + Sink<Item>, + St: Stream + Sink<Item>, { - type Error = S::Error; + type Error = St::Error; delegate_sink!(stream, Item); } diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index 455ddca..6bf2cb7 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -736,17 +736,21 @@ pub trait TryStreamExt: TryStream { /// thread::spawn(move || { /// tx2.unbounded_send(Ok(2)).unwrap(); /// tx2.unbounded_send(Err(3)).unwrap(); + /// tx2.unbounded_send(Ok(4)).unwrap(); /// }); /// thread::spawn(move || { /// tx3.unbounded_send(Ok(rx1)).unwrap(); /// tx3.unbounded_send(Ok(rx2)).unwrap(); - /// tx3.unbounded_send(Err(4)).unwrap(); + /// tx3.unbounded_send(Err(5)).unwrap(); /// }); /// /// let mut stream = rx3.try_flatten(); /// assert_eq!(stream.next().await, Some(Ok(1))); /// assert_eq!(stream.next().await, Some(Ok(2))); /// assert_eq!(stream.next().await, Some(Err(3))); + /// assert_eq!(stream.next().await, Some(Ok(4))); + /// assert_eq!(stream.next().await, Some(Err(5))); + /// assert_eq!(stream.next().await, None); /// # }); /// ``` fn try_flatten(self) -> TryFlatten<Self> @@ -1001,6 +1005,7 @@ pub trait TryStreamExt: TryStream { /// Wraps a [`TryStream`] into a stream compatible with libraries using /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled. /// ``` + /// # if cfg!(miri) { return; } // Miri does not support epoll /// use futures::future::{FutureExt, TryFutureExt}; /// # let (tx, rx) = futures::channel::oneshot::channel(); /// |