diff options
author | Joel Galenson <jgalenson@google.com> | 2021-04-02 17:37:05 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-04-02 17:37:05 +0000 |
commit | bd42804c0d82879869969a7103f63c4efccdbffe (patch) | |
tree | 931c1432113b74ac060d81ede8b4cbf7d3abfd76 /src/stream | |
parent | 2d504b322047484e1755858ff567f71f0e26490c (diff) | |
parent | ac32bfb85d74a0c3d5733f72a0571b4d77b23597 (diff) | |
download | futures-util-bd42804c0d82879869969a7103f63c4efccdbffe.tar.gz |
Upgrade rust/crates/futures-util to 0.3.13 am: 1fdff8fd4c am: cf1d92ef03 am: ac32bfb85d
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/1662922
Change-Id: I2ae408bfa218468323fecc75cab01c06ba6c8b8b
Diffstat (limited to 'src/stream')
-rw-r--r-- | src/stream/empty.rs | 5 | ||||
-rw-r--r-- | src/stream/futures_unordered/mod.rs | 34 | ||||
-rw-r--r-- | src/stream/iter.rs | 5 | ||||
-rw-r--r-- | src/stream/mod.rs | 6 | ||||
-rw-r--r-- | src/stream/once.rs | 3 | ||||
-rw-r--r-- | src/stream/pending.rs | 3 | ||||
-rw-r--r-- | src/stream/poll_fn.rs | 3 | ||||
-rw-r--r-- | src/stream/repeat.rs | 3 | ||||
-rw-r--r-- | src/stream/repeat_with.rs | 3 | ||||
-rw-r--r-- | src/stream/select.rs | 5 | ||||
-rw-r--r-- | src/stream/select_all.rs | 3 | ||||
-rw-r--r-- | src/stream/stream/mod.rs | 32 | ||||
-rw-r--r-- | src/stream/try_stream/mod.rs | 10 | ||||
-rw-r--r-- | src/stream/try_stream/try_unfold.rs | 5 | ||||
-rw-r--r-- | src/stream/unfold.rs | 5 |
15 files changed, 72 insertions, 53 deletions
diff --git a/src/stream/empty.rs b/src/stream/empty.rs index d228b31..c629a4b 100644 --- a/src/stream/empty.rs +++ b/src/stream/empty.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::marker::PhantomData; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; @@ -14,9 +15,9 @@ pub struct Empty<T> { /// /// The returned stream will always return `Ready(None)` when polled. pub fn empty<T>() -> Empty<T> { - Empty { + assert_stream::<T, _>(Empty { _phantom: PhantomData - } + }) } impl<T> Unpin for Empty<T> {} diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index 37b7d7e..8dcc551 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs @@ -30,22 +30,6 @@ use self::task::Task; mod ready_to_run_queue; use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue}; -/// Constant used for a `FuturesUnordered` to determine how many times it is -/// allowed to poll underlying futures without yielding. -/// -/// A single call to `poll_next` may potentially do a lot of work before -/// yielding. This happens in particular if the underlying futures are awoken -/// frequently but continue to return `Pending`. This is problematic if other -/// tasks are waiting on the executor, since they do not get to run. This value -/// caps the number of calls to `poll` on underlying futures a single call to -/// `poll_next` is allowed to make. -/// -/// The value itself is chosen somewhat arbitrarily. It needs to be high enough -/// that amortize wakeup and scheduling costs, but low enough that we do not -/// starve other tasks for long. -/// -/// See also https://github.com/rust-lang/futures-rs/issues/2047. -const YIELD_EVERY: usize = 32; /// A set of futures which may complete in any order. /// @@ -414,6 +398,22 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + // Variable to determine how many times it is allowed to poll underlying + // futures without yielding. + // + // A single call to `poll_next` may potentially do a lot of work before + // yielding. This happens in particular if the underlying futures are awoken + // frequently but continue to return `Pending`. This is problematic if other + // tasks are waiting on the executor, since they do not get to run. This value + // caps the number of calls to `poll` on underlying futures a single call to + // `poll_next` is allowed to make. + // + // The value is the length of FuturesUnordered. This ensures that each + // future is polled only once at most per iteration. + // + // See also https://github.com/rust-lang/futures-rs/issues/2047. + let yield_every = self.len(); + // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; @@ -548,7 +548,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { let task = bomb.task.take().unwrap(); bomb.queue.link(task); - if polled == YIELD_EVERY { + if polled == yield_every { // We have polled a large number of futures in a row without yielding. // To ensure we do not starve other tasks waiting on the executor, // we yield here, but immediately wake ourselves up to continue. diff --git a/src/stream/iter.rs b/src/stream/iter.rs index cab8cd8..033dae1 100644 --- a/src/stream/iter.rs +++ b/src/stream/iter.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; @@ -28,9 +29,9 @@ impl<I> Unpin for Iter<I> {} pub fn iter<I>(i: I) -> Iter<I::IntoIter> where I: IntoIterator, { - Iter { + assert_stream::<I::Item, _>(Iter { iter: i.into_iter(), - } + }) } impl<I> Stream for Iter<I> diff --git a/src/stream/mod.rs b/src/stream/mod.rs index a5624ba..f3b2baa 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -109,11 +109,11 @@ cfg_target_has_atomic! { pub use self::select_all::{select_all, SelectAll}; } -// Just a helper function to ensure the futures we're returning all have the +// Just a helper function to ensure the streams we're returning all have the // right implementations. pub(crate) fn assert_stream<T, S>(stream: S) -> S - where - S: Stream<Item = T>, +where + S: Stream<Item = T>, { stream } diff --git a/src/stream/once.rs b/src/stream/once.rs index 318de07..e16fe00 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; @@ -17,7 +18,7 @@ use pin_project_lite::pin_project; /// # }); /// ``` pub fn once<Fut: Future>(future: Fut) -> Once<Fut> { - Once::new(future) + assert_stream::<Fut::Output, _>(Once::new(future)) } pin_project! { diff --git a/src/stream/pending.rs b/src/stream/pending.rs index ca793c1..d7030ff 100644 --- a/src/stream/pending.rs +++ b/src/stream/pending.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::marker; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; @@ -14,7 +15,7 @@ pub struct Pending<T> { /// /// The returned stream will always return `Pending` when polled. pub fn pending<T>() -> Pending<T> { - Pending { _data: marker::PhantomData } + assert_stream::<T, _>(Pending { _data: marker::PhantomData }) } impl<T> Unpin for Pending<T> {} diff --git a/src/stream/poll_fn.rs b/src/stream/poll_fn.rs index e33ca57..b9bd7d1 100644 --- a/src/stream/poll_fn.rs +++ b/src/stream/poll_fn.rs @@ -1,5 +1,6 @@ //! Definition of the `PollFn` combinator +use super::assert_stream; use core::fmt; use core::pin::Pin; use futures_core::stream::Stream; @@ -41,7 +42,7 @@ pub fn poll_fn<T, F>(f: F) -> PollFn<F> where F: FnMut(&mut Context<'_>) -> Poll<Option<T>>, { - PollFn { f } + assert_stream::<T, _>(PollFn { f }) } impl<T, F> Stream for PollFn<F> diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index 6a2637d..cf9f21b 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; @@ -26,7 +27,7 @@ pub struct Repeat<T> { pub fn repeat<T>(item: T) -> Repeat<T> where T: Clone { - Repeat { item } + assert_stream::<T, _>(Repeat { item }) } impl<T> Unpin for Repeat<T> {} diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs index eb3313d..0255643 100644 --- a/src/stream/repeat_with.rs +++ b/src/stream/repeat_with.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::pin::Pin; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; @@ -89,5 +90,5 @@ impl<A, F: FnMut() -> A> FusedStream for RepeatWith<F> /// # }); /// ``` pub fn repeat_with<A, F: FnMut() -> A>(repeater: F) -> RepeatWith<F> { - RepeatWith { repeater } + assert_stream::<A, _>(RepeatWith { repeater }) } diff --git a/src/stream/select.rs b/src/stream/select.rs index 2b7ebec..2942494 100644 --- a/src/stream/select.rs +++ b/src/stream/select.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use crate::stream::{StreamExt, Fuse}; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; @@ -31,11 +32,11 @@ pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2> where St1: Stream, St2: Stream<Item = St1::Item> { - Select { + assert_stream::<St1::Item, _>(Select { stream1: stream1.fuse(), stream2: stream2.fuse(), flag: false, - } + }) } impl<St1, St2> Select<St1, St2> { diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs index 00368bb..c0b92fa 100644 --- a/src/stream/select_all.rs +++ b/src/stream/select_all.rs @@ -8,6 +8,7 @@ use futures_core::ready; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; +use super::assert_stream; use crate::stream::{StreamExt, StreamFuture, FuturesUnordered}; /// An unbounded set of streams @@ -124,7 +125,7 @@ pub fn select_all<I>(streams: I) -> SelectAll<I::Item> set.push(stream); } - set + assert_stream::<<I::Item as Stream>::Item, _>(set) } impl<St: Stream + Unpin> FromIterator<St> for SelectAll<St> { diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b1b4384..c3340ec 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -4,8 +4,11 @@ //! including the `StreamExt` trait which adds methods to `Stream` types. use crate::future::{assert_future, Either}; +use crate::stream::assert_stream; #[cfg(feature = "alloc")] use alloc::boxed::Box; +#[cfg(feature = "alloc")] +use alloc::vec::Vec; use core::pin::Pin; #[cfg(feature = "sink")] use futures_core::stream::TryStream; @@ -19,7 +22,7 @@ use futures_core::{ #[cfg(feature = "sink")] use futures_sink::Sink; -use crate::fns::{InspectFn, inspect_fn}; +use crate::fns::{inspect_fn, InspectFn}; mod chain; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -201,7 +204,6 @@ mod catch_unwind; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::catch_unwind::CatchUnwind; -use crate::stream::assert_stream; impl<T: ?Sized> StreamExt for T where T: Stream {} @@ -689,7 +691,7 @@ pub trait StreamExt: Stream { U: Stream, Self: Sized, { - FlatMap::new(self, f) + assert_stream::<U::Item, _>(FlatMap::new(self, f)) } /// Combinator similar to [`StreamExt::fold`] that holds internal state @@ -722,7 +724,7 @@ pub trait StreamExt: Stream { Fut: Future<Output = Option<B>>, Self: Sized, { - Scan::new(self, initial_state, f) + assert_stream::<B, _>(Scan::new(self, initial_state, f)) } /// Skip elements on this stream while the provided asynchronous predicate @@ -793,7 +795,7 @@ pub trait StreamExt: Stream { /// this stream combinator will always return that the stream is done. /// /// The stopping future may return any type. Once the stream is stopped - /// the result of the stopping future may be aceessed with `TakeUntil::take_result()`. + /// the result of the stopping future may be accessed with `TakeUntil::take_result()`. /// The stream may also be resumed with `TakeUntil::take_future()`. /// See the documentation of [`TakeUntil`] for more information. /// @@ -827,7 +829,7 @@ pub trait StreamExt: Stream { Fut: Future, Self: Sized, { - TakeUntil::new(self, fut) + assert_stream::<Self::Item, _>(TakeUntil::new(self, fut)) } /// Runs this stream to completion, executing the provided asynchronous @@ -1289,7 +1291,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - assert_stream::<alloc::vec::Vec<Self::Item>, _>(Chunks::new(self, capacity)) + assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity)) } /// An adaptor for chunking up ready items of the stream inside a vector. @@ -1312,10 +1314,10 @@ pub trait StreamExt: Stream { /// This method will panic if `capacity` is zero. #[cfg(feature = "alloc")] fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self> - where - Self: Sized, + where + Self: Sized, { - ReadyChunks::new(self, capacity) + assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity)) } /// A future that completes after the given stream has been fully processed @@ -1334,7 +1336,10 @@ pub trait StreamExt: Stream { where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized, + // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>, { + // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>` + // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink)) Forward::new(self, sink) } @@ -1356,7 +1361,10 @@ pub trait StreamExt: Stream { Self: Sink<Item> + Sized, { let (sink, stream) = split::split(self); - (sink, assert_stream::<Self::Item, _>(stream)) + ( + crate::sink::assert_sink::<Item, Self::Error, _>(sink), + assert_stream::<Self::Item, _>(stream), + ) } /// Do something with each item of this stream, afterwards passing it on. @@ -1459,6 +1467,6 @@ pub trait StreamExt: Stream { where Self: Unpin + FusedStream, { - SelectNextSome::new(self) + assert_future::<Self::Item, _>(SelectNextSome::new(self)) } } diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index 6a48a4c..b7353d9 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -14,7 +14,9 @@ use futures_core::{ use crate::fns::{ InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn, }; +use crate::future::assert_future; use crate::stream::{Map, Inspect}; +use crate::stream::assert_stream; mod and_then; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -135,8 +137,6 @@ mod into_async_read; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::into_async_read::IntoAsyncRead; -use crate::future::assert_future; -use crate::stream::assert_stream; impl<S: ?Sized + TryStream> TryStreamExt for S {} @@ -471,7 +471,7 @@ pub trait TryStreamExt: TryStream { Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized, { - TryTakeWhile::new(self, f) + assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f)) } /// Attempts to run this stream to completion, executing the provided asynchronous @@ -919,7 +919,7 @@ pub trait TryStreamExt: TryStream { Self::Ok: TryFuture<Error = Self::Error>, Self: Sized, { - TryBuffered::new(self, n) + assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(self, n)) } // TODO: false positive warning from rustdoc. Verify once #43466 settles @@ -997,6 +997,6 @@ pub trait TryStreamExt: TryStream { Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin, Self::Ok: AsRef<[u8]>, { - IntoAsyncRead::new(self) + crate::io::assert_read(IntoAsyncRead::new(self)) } } diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs index c8fc421..258c18e 100644 --- a/src/stream/try_stream/try_unfold.rs +++ b/src/stream/try_stream/try_unfold.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; @@ -60,11 +61,11 @@ where F: FnMut(T) -> Fut, Fut: TryFuture<Ok = Option<(Item, T)>>, { - TryUnfold { + assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold { f, state: Some(init), fut: None, - } + }) } pin_project! { diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs index 473bb67..e17d465 100644 --- a/src/stream/unfold.rs +++ b/src/stream/unfold.rs @@ -1,3 +1,4 @@ +use super::assert_stream; use crate::unfold_state::UnfoldState; use core::fmt; use core::pin::Pin; @@ -51,10 +52,10 @@ where F: FnMut(T) -> Fut, Fut: Future<Output = Option<(Item, T)>>, { - Unfold { + assert_stream::<Item, _>(Unfold { f, state: UnfoldState::Value { value: init }, - } + }) } pin_project! { |