aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-04-02 17:37:05 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-04-02 17:37:05 +0000
commitbd42804c0d82879869969a7103f63c4efccdbffe (patch)
tree931c1432113b74ac060d81ede8b4cbf7d3abfd76 /src/stream
parent2d504b322047484e1755858ff567f71f0e26490c (diff)
parentac32bfb85d74a0c3d5733f72a0571b4d77b23597 (diff)
downloadfutures-util-bd42804c0d82879869969a7103f63c4efccdbffe.tar.gz
Upgrade rust/crates/futures-util to 0.3.13 am: 1fdff8fd4c am: cf1d92ef03 am: ac32bfb85d
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/1662922 Change-Id: I2ae408bfa218468323fecc75cab01c06ba6c8b8b
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/empty.rs5
-rw-r--r--src/stream/futures_unordered/mod.rs34
-rw-r--r--src/stream/iter.rs5
-rw-r--r--src/stream/mod.rs6
-rw-r--r--src/stream/once.rs3
-rw-r--r--src/stream/pending.rs3
-rw-r--r--src/stream/poll_fn.rs3
-rw-r--r--src/stream/repeat.rs3
-rw-r--r--src/stream/repeat_with.rs3
-rw-r--r--src/stream/select.rs5
-rw-r--r--src/stream/select_all.rs3
-rw-r--r--src/stream/stream/mod.rs32
-rw-r--r--src/stream/try_stream/mod.rs10
-rw-r--r--src/stream/try_stream/try_unfold.rs5
-rw-r--r--src/stream/unfold.rs5
15 files changed, 72 insertions, 53 deletions
diff --git a/src/stream/empty.rs b/src/stream/empty.rs
index d228b31..c629a4b 100644
--- a/src/stream/empty.rs
+++ b/src/stream/empty.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::marker::PhantomData;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
@@ -14,9 +15,9 @@ pub struct Empty<T> {
///
/// The returned stream will always return `Ready(None)` when polled.
pub fn empty<T>() -> Empty<T> {
- Empty {
+ assert_stream::<T, _>(Empty {
_phantom: PhantomData
- }
+ })
}
impl<T> Unpin for Empty<T> {}
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs
index 37b7d7e..8dcc551 100644
--- a/src/stream/futures_unordered/mod.rs
+++ b/src/stream/futures_unordered/mod.rs
@@ -30,22 +30,6 @@ use self::task::Task;
mod ready_to_run_queue;
use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue};
-/// Constant used for a `FuturesUnordered` to determine how many times it is
-/// allowed to poll underlying futures without yielding.
-///
-/// A single call to `poll_next` may potentially do a lot of work before
-/// yielding. This happens in particular if the underlying futures are awoken
-/// frequently but continue to return `Pending`. This is problematic if other
-/// tasks are waiting on the executor, since they do not get to run. This value
-/// caps the number of calls to `poll` on underlying futures a single call to
-/// `poll_next` is allowed to make.
-///
-/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
-/// that amortize wakeup and scheduling costs, but low enough that we do not
-/// starve other tasks for long.
-///
-/// See also https://github.com/rust-lang/futures-rs/issues/2047.
-const YIELD_EVERY: usize = 32;
/// A set of futures which may complete in any order.
///
@@ -414,6 +398,22 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
+ // Variable to determine how many times it is allowed to poll underlying
+ // futures without yielding.
+ //
+ // A single call to `poll_next` may potentially do a lot of work before
+ // yielding. This happens in particular if the underlying futures are awoken
+ // frequently but continue to return `Pending`. This is problematic if other
+ // tasks are waiting on the executor, since they do not get to run. This value
+ // caps the number of calls to `poll` on underlying futures a single call to
+ // `poll_next` is allowed to make.
+ //
+ // The value is the length of FuturesUnordered. This ensures that each
+ // future is polled only once at most per iteration.
+ //
+ // See also https://github.com/rust-lang/futures-rs/issues/2047.
+ let yield_every = self.len();
+
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
@@ -548,7 +548,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
let task = bomb.task.take().unwrap();
bomb.queue.link(task);
- if polled == YIELD_EVERY {
+ if polled == yield_every {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
diff --git a/src/stream/iter.rs b/src/stream/iter.rs
index cab8cd8..033dae1 100644
--- a/src/stream/iter.rs
+++ b/src/stream/iter.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::pin::Pin;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
@@ -28,9 +29,9 @@ impl<I> Unpin for Iter<I> {}
pub fn iter<I>(i: I) -> Iter<I::IntoIter>
where I: IntoIterator,
{
- Iter {
+ assert_stream::<I::Item, _>(Iter {
iter: i.into_iter(),
- }
+ })
}
impl<I> Stream for Iter<I>
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index a5624ba..f3b2baa 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -109,11 +109,11 @@ cfg_target_has_atomic! {
pub use self::select_all::{select_all, SelectAll};
}
-// Just a helper function to ensure the futures we're returning all have the
+// Just a helper function to ensure the streams we're returning all have the
// right implementations.
pub(crate) fn assert_stream<T, S>(stream: S) -> S
- where
- S: Stream<Item = T>,
+where
+ S: Stream<Item = T>,
{
stream
}
diff --git a/src/stream/once.rs b/src/stream/once.rs
index 318de07..e16fe00 100644
--- a/src/stream/once.rs
+++ b/src/stream/once.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::ready;
@@ -17,7 +18,7 @@ use pin_project_lite::pin_project;
/// # });
/// ```
pub fn once<Fut: Future>(future: Fut) -> Once<Fut> {
- Once::new(future)
+ assert_stream::<Fut::Output, _>(Once::new(future))
}
pin_project! {
diff --git a/src/stream/pending.rs b/src/stream/pending.rs
index ca793c1..d7030ff 100644
--- a/src/stream/pending.rs
+++ b/src/stream/pending.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::marker;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
@@ -14,7 +15,7 @@ pub struct Pending<T> {
///
/// The returned stream will always return `Pending` when polled.
pub fn pending<T>() -> Pending<T> {
- Pending { _data: marker::PhantomData }
+ assert_stream::<T, _>(Pending { _data: marker::PhantomData })
}
impl<T> Unpin for Pending<T> {}
diff --git a/src/stream/poll_fn.rs b/src/stream/poll_fn.rs
index e33ca57..b9bd7d1 100644
--- a/src/stream/poll_fn.rs
+++ b/src/stream/poll_fn.rs
@@ -1,5 +1,6 @@
//! Definition of the `PollFn` combinator
+use super::assert_stream;
use core::fmt;
use core::pin::Pin;
use futures_core::stream::Stream;
@@ -41,7 +42,7 @@ pub fn poll_fn<T, F>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
{
- PollFn { f }
+ assert_stream::<T, _>(PollFn { f })
}
impl<T, F> Stream for PollFn<F>
diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs
index 6a2637d..cf9f21b 100644
--- a/src/stream/repeat.rs
+++ b/src/stream/repeat.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::pin::Pin;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
@@ -26,7 +27,7 @@ pub struct Repeat<T> {
pub fn repeat<T>(item: T) -> Repeat<T>
where T: Clone
{
- Repeat { item }
+ assert_stream::<T, _>(Repeat { item })
}
impl<T> Unpin for Repeat<T> {}
diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs
index eb3313d..0255643 100644
--- a/src/stream/repeat_with.rs
+++ b/src/stream/repeat_with.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::pin::Pin;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
@@ -89,5 +90,5 @@ impl<A, F: FnMut() -> A> FusedStream for RepeatWith<F>
/// # });
/// ```
pub fn repeat_with<A, F: FnMut() -> A>(repeater: F) -> RepeatWith<F> {
- RepeatWith { repeater }
+ assert_stream::<A, _>(RepeatWith { repeater })
}
diff --git a/src/stream/select.rs b/src/stream/select.rs
index 2b7ebec..2942494 100644
--- a/src/stream/select.rs
+++ b/src/stream/select.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use crate::stream::{StreamExt, Fuse};
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
@@ -31,11 +32,11 @@ pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
where St1: Stream,
St2: Stream<Item = St1::Item>
{
- Select {
+ assert_stream::<St1::Item, _>(Select {
stream1: stream1.fuse(),
stream2: stream2.fuse(),
flag: false,
- }
+ })
}
impl<St1, St2> Select<St1, St2> {
diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs
index 00368bb..c0b92fa 100644
--- a/src/stream/select_all.rs
+++ b/src/stream/select_all.rs
@@ -8,6 +8,7 @@ use futures_core::ready;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
+use super::assert_stream;
use crate::stream::{StreamExt, StreamFuture, FuturesUnordered};
/// An unbounded set of streams
@@ -124,7 +125,7 @@ pub fn select_all<I>(streams: I) -> SelectAll<I::Item>
set.push(stream);
}
- set
+ assert_stream::<<I::Item as Stream>::Item, _>(set)
}
impl<St: Stream + Unpin> FromIterator<St> for SelectAll<St> {
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index b1b4384..c3340ec 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -4,8 +4,11 @@
//! including the `StreamExt` trait which adds methods to `Stream` types.
use crate::future::{assert_future, Either};
+use crate::stream::assert_stream;
#[cfg(feature = "alloc")]
use alloc::boxed::Box;
+#[cfg(feature = "alloc")]
+use alloc::vec::Vec;
use core::pin::Pin;
#[cfg(feature = "sink")]
use futures_core::stream::TryStream;
@@ -19,7 +22,7 @@ use futures_core::{
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use crate::fns::{InspectFn, inspect_fn};
+use crate::fns::{inspect_fn, InspectFn};
mod chain;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
@@ -201,7 +204,6 @@ mod catch_unwind;
#[cfg(feature = "std")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::catch_unwind::CatchUnwind;
-use crate::stream::assert_stream;
impl<T: ?Sized> StreamExt for T where T: Stream {}
@@ -689,7 +691,7 @@ pub trait StreamExt: Stream {
U: Stream,
Self: Sized,
{
- FlatMap::new(self, f)
+ assert_stream::<U::Item, _>(FlatMap::new(self, f))
}
/// Combinator similar to [`StreamExt::fold`] that holds internal state
@@ -722,7 +724,7 @@ pub trait StreamExt: Stream {
Fut: Future<Output = Option<B>>,
Self: Sized,
{
- Scan::new(self, initial_state, f)
+ assert_stream::<B, _>(Scan::new(self, initial_state, f))
}
/// Skip elements on this stream while the provided asynchronous predicate
@@ -793,7 +795,7 @@ pub trait StreamExt: Stream {
/// this stream combinator will always return that the stream is done.
///
/// The stopping future may return any type. Once the stream is stopped
- /// the result of the stopping future may be aceessed with `TakeUntil::take_result()`.
+ /// the result of the stopping future may be accessed with `TakeUntil::take_result()`.
/// The stream may also be resumed with `TakeUntil::take_future()`.
/// See the documentation of [`TakeUntil`] for more information.
///
@@ -827,7 +829,7 @@ pub trait StreamExt: Stream {
Fut: Future,
Self: Sized,
{
- TakeUntil::new(self, fut)
+ assert_stream::<Self::Item, _>(TakeUntil::new(self, fut))
}
/// Runs this stream to completion, executing the provided asynchronous
@@ -1289,7 +1291,7 @@ pub trait StreamExt: Stream {
where
Self: Sized,
{
- assert_stream::<alloc::vec::Vec<Self::Item>, _>(Chunks::new(self, capacity))
+ assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity))
}
/// An adaptor for chunking up ready items of the stream inside a vector.
@@ -1312,10 +1314,10 @@ pub trait StreamExt: Stream {
/// This method will panic if `capacity` is zero.
#[cfg(feature = "alloc")]
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
- where
- Self: Sized,
+ where
+ Self: Sized,
{
- ReadyChunks::new(self, capacity)
+ assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity))
}
/// A future that completes after the given stream has been fully processed
@@ -1334,7 +1336,10 @@ pub trait StreamExt: Stream {
where
S: Sink<Self::Ok, Error = Self::Error>,
Self: TryStream + Sized,
+ // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>,
{
+ // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>`
+ // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink))
Forward::new(self, sink)
}
@@ -1356,7 +1361,10 @@ pub trait StreamExt: Stream {
Self: Sink<Item> + Sized,
{
let (sink, stream) = split::split(self);
- (sink, assert_stream::<Self::Item, _>(stream))
+ (
+ crate::sink::assert_sink::<Item, Self::Error, _>(sink),
+ assert_stream::<Self::Item, _>(stream),
+ )
}
/// Do something with each item of this stream, afterwards passing it on.
@@ -1459,6 +1467,6 @@ pub trait StreamExt: Stream {
where
Self: Unpin + FusedStream,
{
- SelectNextSome::new(self)
+ assert_future::<Self::Item, _>(SelectNextSome::new(self))
}
}
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 6a48a4c..b7353d9 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -14,7 +14,9 @@ use futures_core::{
use crate::fns::{
InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn,
};
+use crate::future::assert_future;
use crate::stream::{Map, Inspect};
+use crate::stream::assert_stream;
mod and_then;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
@@ -135,8 +137,6 @@ mod into_async_read;
#[cfg(feature = "std")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::into_async_read::IntoAsyncRead;
-use crate::future::assert_future;
-use crate::stream::assert_stream;
impl<S: ?Sized + TryStream> TryStreamExt for S {}
@@ -471,7 +471,7 @@ pub trait TryStreamExt: TryStream {
Fut: TryFuture<Ok = bool, Error = Self::Error>,
Self: Sized,
{
- TryTakeWhile::new(self, f)
+ assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
}
/// Attempts to run this stream to completion, executing the provided asynchronous
@@ -919,7 +919,7 @@ pub trait TryStreamExt: TryStream {
Self::Ok: TryFuture<Error = Self::Error>,
Self: Sized,
{
- TryBuffered::new(self, n)
+ assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(self, n))
}
// TODO: false positive warning from rustdoc. Verify once #43466 settles
@@ -997,6 +997,6 @@ pub trait TryStreamExt: TryStream {
Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin,
Self::Ok: AsRef<[u8]>,
{
- IntoAsyncRead::new(self)
+ crate::io::assert_read(IntoAsyncRead::new(self))
}
}
diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs
index c8fc421..258c18e 100644
--- a/src/stream/try_stream/try_unfold.rs
+++ b/src/stream/try_stream/try_unfold.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
@@ -60,11 +61,11 @@ where
F: FnMut(T) -> Fut,
Fut: TryFuture<Ok = Option<(Item, T)>>,
{
- TryUnfold {
+ assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold {
f,
state: Some(init),
fut: None,
- }
+ })
}
pin_project! {
diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs
index 473bb67..e17d465 100644
--- a/src/stream/unfold.rs
+++ b/src/stream/unfold.rs
@@ -1,3 +1,4 @@
+use super::assert_stream;
use crate::unfold_state::UnfoldState;
use core::fmt;
use core::pin::Pin;
@@ -51,10 +52,10 @@ where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
- Unfold {
+ assert_stream::<Item, _>(Unfold {
f,
state: UnfoldState::Value { value: init },
- }
+ })
}
pin_project! {