aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-08-09 10:32:56 -0700
committerJoel Galenson <jgalenson@google.com>2021-08-09 10:32:56 -0700
commit41d5e42e28697b0d96aeaf171e603c1c9c606bd9 (patch)
treef953b1049d550955e09fc83d2806cf23c73cd3fe
parent590d610b243cead0fb895039731f4312f9903db1 (diff)
downloadfutures-util-41d5e42e28697b0d96aeaf171e603c1c9c606bd9.tar.gz
Upgrade rust/crates/futures-util to 0.3.16
Test: make Change-Id: I2b7f571745922fbf3e3eba05c872ddadbfba5dd7
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp30
-rw-r--r--Cargo.toml23
-rw-r--r--Cargo.toml.orig14
-rw-r--r--METADATA8
-rw-r--r--build.rs2
-rw-r--r--no_atomic_cas.rs2
-rw-r--r--src/future/mod.rs17
-rw-r--r--src/io/fill_buf.rs2
-rw-r--r--src/io/into_sink.rs1
-rw-r--r--src/io/write_all_vectored.rs8
-rw-r--r--src/lib.rs33
-rw-r--r--src/lock/mod.rs35
-rw-r--r--src/lock/mutex.rs1
-rw-r--r--src/sink/buffer.rs2
-rw-r--r--src/stream/futures_unordered/mod.rs2
-rw-r--r--src/stream/mod.rs62
-rw-r--r--src/stream/select.rs79
-rw-r--r--src/stream/select_with_strategy.rs229
-rw-r--r--src/stream/stream/all.rs92
-rw-r--r--src/stream/stream/any.rs92
-rw-r--r--src/stream/stream/mod.rs110
-rw-r--r--src/stream/try_stream/mod.rs93
-rw-r--r--src/stream/try_stream/try_chunks.rs131
-rw-r--r--src/task/mod.rs20
25 files changed, 878 insertions, 212 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index ec6442e..99dc8b0 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "fc080d153bc7bf00429ec5e2b91e2f21f2243846"
+ "sha1": "ab38fd29d3f84f8fc028fa7883e53dba423da0ee"
}
}
diff --git a/Android.bp b/Android.bp
index 3c3ea81..859f9da 100644
--- a/Android.bp
+++ b/Android.bp
@@ -38,7 +38,7 @@ license {
}
rust_defaults {
- name: "futures-util_defaults",
+ name: "futures-util_test_defaults",
crate_name: "futures_util",
srcs: ["src/lib.rs"],
test_suites: ["general-tests"],
@@ -84,7 +84,7 @@ rust_defaults {
rust_test_host {
name: "futures-util_host_test_src_lib",
- defaults: ["futures-util_defaults"],
+ defaults: ["futures-util_test_defaults"],
test_options: {
unit_test: true,
},
@@ -92,7 +92,7 @@ rust_test_host {
rust_test {
name: "futures-util_device_test_src_lib",
- defaults: ["futures-util_defaults"],
+ defaults: ["futures-util_test_defaults"],
}
rust_library {
@@ -150,21 +150,21 @@ rust_library {
// bytes-0.4.12
// cfg-if-0.1.10
// cfg-if-1.0.0
-// crossbeam-deque-0.7.3
+// crossbeam-deque-0.7.4
// crossbeam-epoch-0.8.2 "default,lazy_static,std"
// crossbeam-queue-0.2.3 "default,std"
// crossbeam-utils-0.7.2 "default,lazy_static,std"
// fnv-1.0.7 "default,std"
// futures-0.1.31 "default,use_std,with-deprecated"
-// futures-channel-0.3.15 "alloc,std"
-// futures-core-0.3.15 "alloc,std"
-// futures-io-0.3.15 "std"
-// futures-macro-0.3.15
-// futures-sink-0.3.15
-// futures-task-0.3.15 "alloc,std"
+// futures-channel-0.3.16 "alloc,std"
+// futures-core-0.3.16 "alloc,std"
+// futures-io-0.3.16 "std"
+// futures-macro-0.3.16
+// futures-sink-0.3.16
+// futures-task-0.3.16 "alloc,std"
// iovec-0.1.4
// lazy_static-1.4.0
-// libc-0.2.94 "default,std"
+// libc-0.2.98 "default,std"
// lock_api-0.3.4
// log-0.4.14
// maybe-uninit-2.0.0
@@ -176,19 +176,19 @@ rust_library {
// num_cpus-1.13.0
// parking_lot-0.9.0 "default"
// parking_lot_core-0.6.2
-// pin-project-lite-0.2.6
+// pin-project-lite-0.2.7
// pin-utils-0.1.0
// proc-macro-hack-0.5.19
// proc-macro-nested-0.1.7
-// proc-macro2-1.0.26 "default,proc-macro"
+// proc-macro2-1.0.28 "default,proc-macro"
// quote-1.0.9 "default,proc-macro"
// rustc_version-0.2.3
// scopeguard-1.1.0
// semver-0.9.0 "default"
// semver-parser-0.7.0
-// slab-0.4.3 "default,std"
+// slab-0.4.4 "default,std"
// smallvec-0.6.14 "default,std"
-// syn-1.0.72 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote"
+// syn-1.0.74 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote"
// tokio-0.1.22 "bytes,codec,default,fs,io,mio,num_cpus,reactor,rt-full,sync,tcp,timer,tokio-codec,tokio-current-thread,tokio-executor,tokio-fs,tokio-io,tokio-reactor,tokio-sync,tokio-tcp,tokio-threadpool,tokio-timer,tokio-udp,tokio-uds,udp,uds"
// tokio-codec-0.1.2
// tokio-current-thread-0.1.7
diff --git a/Cargo.toml b/Cargo.toml
index d4b5664..aaaebac 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,17 +3,16 @@
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
#
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
[package]
edition = "2018"
name = "futures-util"
-version = "0.3.15"
+version = "0.3.16"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
description = "Common utilities and extension traits for the futures-rs library.\n"
homepage = "https://rust-lang.github.io/futures-rs"
@@ -24,33 +23,33 @@ repository = "https://github.com/rust-lang/futures-rs"
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[dependencies.futures-channel]
-version = "0.3.15"
+version = "0.3.16"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-core]
-version = "0.3.15"
+version = "0.3.16"
default-features = false
[dependencies.futures-io]
-version = "0.3.15"
+version = "0.3.16"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-macro]
-version = "=0.3.15"
+version = "=0.3.16"
optional = true
default-features = false
[dependencies.futures-sink]
-version = "0.3.15"
+version = "0.3.16"
optional = true
default-features = false
[dependencies.futures-task]
-version = "0.3.15"
+version = "0.3.16"
default-features = false
[dependencies.futures_01]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index de77fda..98cace6 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,7 +1,7 @@
[package]
name = "futures-util"
edition = "2018"
-version = "0.3.15"
+version = "0.3.16"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
@@ -39,12 +39,12 @@ cfg-target-has-atomic = []
autocfg = "1"
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.15", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.15", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.15", default-features = false, features = ["std"], optional = true }
-futures-io = { path = "../futures-io", version = "0.3.15", default-features = false, features = ["std"], optional = true }
-futures-sink = { path = "../futures-sink", version = "0.3.15", default-features = false, optional = true }
-futures-macro = { path = "../futures-macro", version = "=0.3.15", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.16", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.16", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.16", default-features = false, features = ["std"], optional = true }
+futures-io = { path = "../futures-io", version = "0.3.16", default-features = false, features = ["std"], optional = true }
+futures-sink = { path = "../futures-sink", version = "0.3.16", default-features = false, optional = true }
+futures-macro = { path = "../futures-macro", version = "=0.3.16", default-features = false, optional = true }
proc-macro-hack = { version = "0.5.19", optional = true }
proc-macro-nested = { version = "0.1.2", optional = true }
slab = { version = "0.4.2", optional = true }
diff --git a/METADATA b/METADATA
index 85b6846..ce359d5 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures-util/futures-util-0.3.15.crate"
+ value: "https://static.crates.io/crates/futures-util/futures-util-0.3.16.crate"
}
- version: "0.3.15"
+ version: "0.3.16"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 5
- day: 19
+ month: 8
+ day: 9
}
}
diff --git a/build.rs b/build.rs
index ffe9711..f8aa5fe 100644
--- a/build.rs
+++ b/build.rs
@@ -9,7 +9,7 @@ include!("no_atomic_cas.rs");
// and outside of the normal semver guarantees:
//
// - `futures_no_atomic_cas`
-// Assume the target does not have atomic CAS (compare-and-swap).
+// Assume the target does *not* support atomic CAS operations.
// This is usually detected automatically by the build script, but you may
// need to enable it manually when building for custom targets or using
// non-cargo build systems that don't run the build script.
diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs
index 0819af1..4708bf8 100644
--- a/no_atomic_cas.rs
+++ b/no_atomic_cas.rs
@@ -3,6 +3,8 @@
const NO_ATOMIC_CAS_TARGETS: &[&str] = &[
"avr-unknown-gnu-atmega328",
+ "bpfeb-unknown-none",
+ "bpfel-unknown-none",
"msp430-none-elf",
"riscv32i-unknown-none-elf",
"riscv32imc-unknown-none-elf",
diff --git a/src/future/mod.rs b/src/future/mod.rs
index 7a63e5f..cd08264 100644
--- a/src/future/mod.rs
+++ b/src/future/mod.rs
@@ -108,14 +108,15 @@ pub use self::select_ok::{select_ok, SelectOk};
mod either;
pub use self::either::Either;
-cfg_target_has_atomic! {
- #[cfg(feature = "alloc")]
- mod abortable;
- #[cfg(feature = "alloc")]
- pub use crate::abortable::{Abortable, AbortHandle, AbortRegistration, Aborted};
- #[cfg(feature = "alloc")]
- pub use abortable::abortable;
-}
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod abortable;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted};
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use abortable::abortable;
// Just a helper function to ensure the futures we're returning all have the
// right implementations.
diff --git a/src/io/fill_buf.rs b/src/io/fill_buf.rs
index 19b0d20..a1484c0 100644
--- a/src/io/fill_buf.rs
+++ b/src/io/fill_buf.rs
@@ -30,7 +30,7 @@ where
let reader = this.reader.take().expect("Polled FillBuf after completion");
match Pin::new(&mut *reader).poll_fill_buf(cx) {
- // With polinius it is possible to remove this inner match and just have the correct
+ // With polonius it is possible to remove this inner match and just have the correct
// lifetime of the reference inferred based on which branch is taken
Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
diff --git a/src/io/into_sink.rs b/src/io/into_sink.rs
index 384b8e3..6a41ee2 100644
--- a/src/io/into_sink.rs
+++ b/src/io/into_sink.rs
@@ -62,7 +62,6 @@ impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> {
Poll::Ready(Ok(()))
}
- #[allow(clippy::debug_assert_with_mut_call)]
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
debug_assert!(self.buffer.is_none());
*self.project().buffer = Some(Block { offset: 0, bytes: item });
diff --git a/src/io/write_all_vectored.rs b/src/io/write_all_vectored.rs
index f465209..a8fc4c6 100644
--- a/src/io/write_all_vectored.rs
+++ b/src/io/write_all_vectored.rs
@@ -4,7 +4,6 @@ use futures_core::task::{Context, Poll};
use futures_io::AsyncWrite;
use futures_io::IoSlice;
use std::io;
-use std::mem;
use std::pin::Pin;
/// Future for the
@@ -19,8 +18,9 @@ pub struct WriteAllVectored<'a, W: ?Sized + Unpin> {
impl<W: ?Sized + Unpin> Unpin for WriteAllVectored<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> {
- pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self {
- Self { writer, bufs: IoSlice::advance(bufs, 0) }
+ pub(super) fn new(writer: &'a mut W, mut bufs: &'a mut [IoSlice<'a>]) -> Self {
+ IoSlice::advance_slices(&mut bufs, 0);
+ Self { writer, bufs }
}
}
@@ -34,7 +34,7 @@ impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAllVectored<'_, W> {
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
} else {
- this.bufs = IoSlice::advance(mem::take(&mut this.bufs), n);
+ IoSlice::advance_slices(&mut this.bufs, n);
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 16871cb..5f803a7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -4,11 +4,20 @@
#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
#![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]
#![cfg_attr(not(feature = "std"), no_std)]
-#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
-// It cannot be included in the published code because this lints have false positives in the minimum required version.
-#![cfg_attr(test, warn(single_use_lifetimes))]
-#![warn(clippy::all)]
-#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
+#![warn(
+ missing_debug_implementations,
+ missing_docs,
+ rust_2018_idioms,
+ single_use_lifetimes,
+ unreachable_pub
+)]
+#![doc(test(
+ no_crate_inject,
+ attr(
+ deny(warnings, rust_2018_idioms, single_use_lifetimes),
+ allow(dead_code, unused_assignments, unused_variables)
+ )
+))]
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(all(feature = "bilock", not(feature = "unstable")))]
@@ -47,13 +56,6 @@ pub mod __private {
}
}
-macro_rules! cfg_target_has_atomic {
- ($($item:item)*) => {$(
- #[cfg(not(futures_no_atomic_cas))]
- $item
- )*};
-}
-
#[cfg(feature = "sink")]
macro_rules! delegate_sink {
($field:ident, $item:ty) => {
@@ -336,10 +338,9 @@ pub use crate::io::{
#[cfg(feature = "alloc")]
pub mod lock;
-cfg_target_has_atomic! {
- #[cfg(feature = "alloc")]
- mod abortable;
-}
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod abortable;
mod fns;
mod unfold_state;
diff --git a/src/lock/mod.rs b/src/lock/mod.rs
index 071eef6..cf374c0 100644
--- a/src/lock/mod.rs
+++ b/src/lock/mod.rs
@@ -3,20 +3,23 @@
//! This module is only available when the `std` or `alloc` feature of this
//! library is activated, and it is activated by default.
-cfg_target_has_atomic! {
- #[cfg(feature = "std")]
- mod mutex;
- #[cfg(feature = "std")]
- pub use self::mutex::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard};
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "std")]
+mod mutex;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "std")]
+pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture};
- #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
- #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
- #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]
- mod bilock;
- #[cfg(feature = "bilock")]
- #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
- pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
- #[cfg(any(feature = "sink", feature = "io"))]
- #[cfg(not(feature = "bilock"))]
- pub(crate) use self::bilock::BiLock;
-}
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
+#[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]
+mod bilock;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(any(feature = "sink", feature = "io"))]
+#[cfg(not(feature = "bilock"))]
+pub(crate) use self::bilock::BiLock;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "bilock")]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
+pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
diff --git a/src/lock/mutex.rs b/src/lock/mutex.rs
index a849aee..85dcb15 100644
--- a/src/lock/mutex.rs
+++ b/src/lock/mutex.rs
@@ -66,7 +66,6 @@ impl Waiter {
}
}
-#[allow(clippy::identity_op)] // https://github.com/rust-lang/rust-clippy/issues/3445
const IS_LOCKED: usize = 1 << 0;
const HAS_WAITERS: usize = 1 << 1;
diff --git a/src/sink/buffer.rs b/src/sink/buffer.rs
index c6ea548..4aa6c36 100644
--- a/src/sink/buffer.rs
+++ b/src/sink/buffer.rs
@@ -91,14 +91,12 @@ impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> {
}
}
- #[allow(clippy::debug_assert_with_mut_call)]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().try_empty_buffer(cx))?;
debug_assert!(self.buf.is_empty());
self.project().sink.poll_flush(cx)
}
- #[allow(clippy::debug_assert_with_mut_call)]
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().try_empty_buffer(cx))?;
debug_assert!(self.buf.is_empty());
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs
index a25fbe0..4a05d88 100644
--- a/src/stream/futures_unordered/mod.rs
+++ b/src/stream/futures_unordered/mod.rs
@@ -97,7 +97,7 @@ impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> {
// Each task is wrapped in an `Arc` and thereby atomically reference counted.
// Also, each task contains an `AtomicBool` which acts as a flag that indicates
// whether the task is currently inserted in the atomic queue. When a wake-up
-// notifiaction is received, the task will only be inserted into the ready to
+// notification is received, the task will only be inserted into the ready to
// run queue if it isn't inserted already.
impl<Fut> Default for FuturesUnordered<Fut> {
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index 0b2fc90..8cf9f80 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -62,6 +62,9 @@ pub use self::try_stream::IntoAsyncRead;
#[cfg(feature = "alloc")]
pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent};
+#[cfg(feature = "alloc")]
+pub use self::try_stream::{TryChunks, TryChunksError};
+
// Primitive streams
mod iter;
@@ -88,33 +91,44 @@ pub use self::poll_fn::{poll_fn, PollFn};
mod select;
pub use self::select::{select, Select};
+mod select_with_strategy;
+pub use self::select_with_strategy::{select_with_strategy, PollNext, SelectWithStrategy};
+
mod unfold;
pub use self::unfold::{unfold, Unfold};
-cfg_target_has_atomic! {
- #[cfg(feature = "alloc")]
- mod futures_ordered;
- #[cfg(feature = "alloc")]
- pub use self::futures_ordered::FuturesOrdered;
-
- #[cfg(feature = "alloc")]
- pub mod futures_unordered;
- #[cfg(feature = "alloc")]
- #[doc(inline)]
- pub use self::futures_unordered::FuturesUnordered;
-
- #[cfg(feature = "alloc")]
- pub mod select_all;
- #[cfg(feature = "alloc")]
- pub use self::select_all::{select_all, SelectAll};
-
- #[cfg(feature = "alloc")]
- mod abortable;
- #[cfg(feature = "alloc")]
- pub use crate::abortable::{Abortable, AbortHandle, AbortRegistration, Aborted};
- #[cfg(feature = "alloc")]
- pub use abortable::abortable;
-}
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod futures_ordered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use self::futures_ordered::FuturesOrdered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub mod futures_unordered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[doc(inline)]
+pub use self::futures_unordered::FuturesUnordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub mod select_all;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[doc(inline)]
+pub use self::select_all::{select_all, SelectAll};
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod abortable;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted};
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use abortable::abortable;
// Just a helper function to ensure the streams we're returning all have the
// right implementations.
diff --git a/src/stream/select.rs b/src/stream/select.rs
index 133ac6c..0c1e3af 100644
--- a/src/stream/select.rs
+++ b/src/stream/select.rs
@@ -1,5 +1,5 @@
use super::assert_stream;
-use crate::stream::{Fuse, StreamExt};
+use crate::stream::{select_with_strategy, PollNext, SelectWithStrategy};
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
@@ -11,10 +11,7 @@ pin_project! {
#[must_use = "streams do nothing unless polled"]
pub struct Select<St1, St2> {
#[pin]
- stream1: Fuse<St1>,
- #[pin]
- stream2: Fuse<St2>,
- flag: bool,
+ inner: SelectWithStrategy<St1, St2, fn(&mut PollNext)-> PollNext, PollNext>,
}
}
@@ -22,21 +19,42 @@ pin_project! {
/// stream will be polled in a round-robin fashion, and whenever a stream is
/// ready to yield an item that item is yielded.
///
-/// After one of the two input stream completes, the remaining one will be
+/// After one of the two input streams completes, the remaining one will be
/// polled exclusively. The returned stream completes when both input
/// streams have completed.
///
/// Note that this function consumes both streams and returns a wrapped
/// version of them.
+///
+/// ## Examples
+///
+/// ```rust
+/// # futures::executor::block_on(async {
+/// use futures::stream::{ repeat, select, StreamExt };
+///
+/// let left = repeat(1);
+/// let right = repeat(2);
+///
+/// let mut out = select(left, right);
+///
+/// for _ in 0..100 {
+/// // We should be alternating.
+/// assert_eq!(1, out.select_next_some().await);
+/// assert_eq!(2, out.select_next_some().await);
+/// }
+/// # });
+/// ```
pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
+ fn round_robin(last: &mut PollNext) -> PollNext {
+ last.toggle()
+ }
+
assert_stream::<St1::Item, _>(Select {
- stream1: stream1.fuse(),
- stream2: stream2.fuse(),
- flag: false,
+ inner: select_with_strategy(stream1, stream2, round_robin),
})
}
@@ -44,7 +62,7 @@ impl<St1, St2> Select<St1, St2> {
/// Acquires a reference to the underlying streams that this combinator is
/// pulling from.
pub fn get_ref(&self) -> (&St1, &St2) {
- (self.stream1.get_ref(), self.stream2.get_ref())
+ self.inner.get_ref()
}
/// Acquires a mutable reference to the underlying streams that this
@@ -53,7 +71,7 @@ impl<St1, St2> Select<St1, St2> {
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
- (self.stream1.get_mut(), self.stream2.get_mut())
+ self.inner.get_mut()
}
/// Acquires a pinned mutable reference to the underlying streams that this
@@ -63,7 +81,7 @@ impl<St1, St2> Select<St1, St2> {
/// stream which may otherwise confuse this combinator.
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
let this = self.project();
- (this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
+ this.inner.get_pin_mut()
}
/// Consumes this combinator, returning the underlying streams.
@@ -71,7 +89,7 @@ impl<St1, St2> Select<St1, St2> {
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> (St1, St2) {
- (self.stream1.into_inner(), self.stream2.into_inner())
+ self.inner.into_inner()
}
}
@@ -81,7 +99,7 @@ where
St2: Stream<Item = St1::Item>,
{
fn is_terminated(&self) -> bool {
- self.stream1.is_terminated() && self.stream2.is_terminated()
+ self.inner.is_terminated()
}
}
@@ -94,37 +112,6 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
let this = self.project();
- if !*this.flag {
- poll_inner(this.flag, this.stream1, this.stream2, cx)
- } else {
- poll_inner(this.flag, this.stream2, this.stream1, cx)
- }
- }
-}
-
-fn poll_inner<St1, St2>(
- flag: &mut bool,
- a: Pin<&mut St1>,
- b: Pin<&mut St2>,
- cx: &mut Context<'_>,
-) -> Poll<Option<St1::Item>>
-where
- St1: Stream,
- St2: Stream<Item = St1::Item>,
-{
- let a_done = match a.poll_next(cx) {
- Poll::Ready(Some(item)) => {
- // give the other stream a chance to go first next time
- *flag = !*flag;
- return Poll::Ready(Some(item));
- }
- Poll::Ready(None) => true,
- Poll::Pending => false,
- };
-
- match b.poll_next(cx) {
- Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
- Poll::Ready(None) if a_done => Poll::Ready(None),
- Poll::Ready(None) | Poll::Pending => Poll::Pending,
+ this.inner.poll_next(cx)
}
}
diff --git a/src/stream/select_with_strategy.rs b/src/stream/select_with_strategy.rs
new file mode 100644
index 0000000..bd86990
--- /dev/null
+++ b/src/stream/select_with_strategy.rs
@@ -0,0 +1,229 @@
+use super::assert_stream;
+use crate::stream::{Fuse, StreamExt};
+use core::{fmt, pin::Pin};
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+/// Type to tell [`SelectWithStrategy`] which stream to poll next.
+#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
+pub enum PollNext {
+ /// Poll the first stream.
+ Left,
+ /// Poll the second stream.
+ Right,
+}
+
+impl PollNext {
+ /// Toggle the value and return the old one.
+ pub fn toggle(&mut self) -> Self {
+ let old = *self;
+
+ match self {
+ PollNext::Left => *self = PollNext::Right,
+ PollNext::Right => *self = PollNext::Left,
+ }
+
+ old
+ }
+}
+
+impl Default for PollNext {
+ fn default() -> Self {
+ PollNext::Left
+ }
+}
+
+pin_project! {
+ /// Stream for the [`select_with_strategy()`] function. See function docs for details.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct SelectWithStrategy<St1, St2, Clos, State> {
+ #[pin]
+ stream1: Fuse<St1>,
+ #[pin]
+ stream2: Fuse<St2>,
+ state: State,
+ clos: Clos,
+ }
+}
+
+/// This function will attempt to pull items from both streams. You provide a
+/// closure to tell [`SelectWithStrategy`] which stream to poll. The closure can
+/// store state on `SelectWithStrategy` to which it will receive a `&mut` on every
+/// invocation. This allows basing the strategy on prior choices.
+///
+/// After one of the two input streams completes, the remaining one will be
+/// polled exclusively. The returned stream completes when both input
+/// streams have completed.
+///
+/// Note that this function consumes both streams and returns a wrapped
+/// version of them.
+///
+/// ## Examples
+///
+/// ### Priority
+/// This example shows how to always prioritize the left stream.
+///
+/// ```rust
+/// # futures::executor::block_on(async {
+/// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
+///
+/// let left = repeat(1);
+/// let right = repeat(2);
+///
+/// // We don't need any state, so let's make it an empty tuple.
+/// // We must provide some type here, as there is no way for the compiler
+/// // to infer it. As we don't need to capture variables, we can just
+/// // use a function pointer instead of a closure.
+/// fn prio_left(_: &mut ()) -> PollNext { PollNext::Left }
+///
+/// let mut out = select_with_strategy(left, right, prio_left);
+///
+/// for _ in 0..100 {
+/// // Whenever we poll out, we will alwas get `1`.
+/// assert_eq!(1, out.select_next_some().await);
+/// }
+/// # });
+/// ```
+///
+/// ### Round Robin
+/// This example shows how to select from both streams round robin.
+/// Note: this special case is provided by [`futures-util::stream::select`].
+///
+/// ```rust
+/// # futures::executor::block_on(async {
+/// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
+///
+/// let left = repeat(1);
+/// let right = repeat(2);
+///
+/// let rrobin = |last: &mut PollNext| last.toggle();
+///
+/// let mut out = select_with_strategy(left, right, rrobin);
+///
+/// for _ in 0..100 {
+/// // We should be alternating now.
+/// assert_eq!(1, out.select_next_some().await);
+/// assert_eq!(2, out.select_next_some().await);
+/// }
+/// # });
+/// ```
+pub fn select_with_strategy<St1, St2, Clos, State>(
+ stream1: St1,
+ stream2: St2,
+ which: Clos,
+) -> SelectWithStrategy<St1, St2, Clos, State>
+where
+ St1: Stream,
+ St2: Stream<Item = St1::Item>,
+ Clos: FnMut(&mut State) -> PollNext,
+ State: Default,
+{
+ assert_stream::<St1::Item, _>(SelectWithStrategy {
+ stream1: stream1.fuse(),
+ stream2: stream2.fuse(),
+ state: Default::default(),
+ clos: which,
+ })
+}
+
+impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
+ /// Acquires a reference to the underlying streams that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> (&St1, &St2) {
+ (self.stream1.get_ref(), self.stream2.get_ref())
+ }
+
+ /// Acquires a mutable reference to the underlying streams that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
+ (self.stream1.get_mut(), self.stream2.get_mut())
+ }
+
+ /// Acquires a pinned mutable reference to the underlying streams that this
+ /// combinator is pulling from.
+ ///
+ /// Note that care must be taken to avoid tampering with the state of the
+ /// stream which may otherwise confuse this combinator.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
+ let this = self.project();
+ (this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
+ }
+
+ /// Consumes this combinator, returning the underlying streams.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> (St1, St2) {
+ (self.stream1.into_inner(), self.stream2.into_inner())
+ }
+}
+
+impl<St1, St2, Clos, State> FusedStream for SelectWithStrategy<St1, St2, Clos, State>
+where
+ St1: Stream,
+ St2: Stream<Item = St1::Item>,
+ Clos: FnMut(&mut State) -> PollNext,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream1.is_terminated() && self.stream2.is_terminated()
+ }
+}
+
+impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
+where
+ St1: Stream,
+ St2: Stream<Item = St1::Item>,
+ Clos: FnMut(&mut State) -> PollNext,
+{
+ type Item = St1::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
+ let this = self.project();
+
+ match (this.clos)(this.state) {
+ PollNext::Left => poll_inner(this.stream1, this.stream2, cx),
+ PollNext::Right => poll_inner(this.stream2, this.stream1, cx),
+ }
+ }
+}
+
+fn poll_inner<St1, St2>(
+ a: Pin<&mut St1>,
+ b: Pin<&mut St2>,
+ cx: &mut Context<'_>,
+) -> Poll<Option<St1::Item>>
+where
+ St1: Stream,
+ St2: Stream<Item = St1::Item>,
+{
+ let a_done = match a.poll_next(cx) {
+ Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
+ Poll::Ready(None) => true,
+ Poll::Pending => false,
+ };
+
+ match b.poll_next(cx) {
+ Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
+ Poll::Ready(None) if a_done => Poll::Ready(None),
+ Poll::Ready(None) | Poll::Pending => Poll::Pending,
+ }
+}
+
+impl<St1, St2, Clos, State> fmt::Debug for SelectWithStrategy<St1, St2, Clos, State>
+where
+ St1: fmt::Debug,
+ St2: fmt::Debug,
+ State: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("SelectWithStrategy")
+ .field("stream1", &self.stream1)
+ .field("stream2", &self.stream2)
+ .field("state", &self.state)
+ .finish()
+ }
+}
diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs
new file mode 100644
index 0000000..ba2baa5
--- /dev/null
+++ b/src/stream/stream/all.rs
@@ -0,0 +1,92 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::stream::Stream;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`all`](super::StreamExt::all) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct All<St, Fut, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ accum: Option<bool>,
+ #[pin]
+ future: Option<Fut>,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for All<St, Fut, F>
+where
+ St: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("All")
+ .field("stream", &self.stream)
+ .field("accum", &self.accum)
+ .field("future", &self.future)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> All<St, Fut, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f, accum: Some(true), future: None }
+ }
+}
+
+impl<St, Fut, F> FusedFuture for All<St, Fut, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ fn is_terminated(&self) -> bool {
+ self.accum.is_none() && self.future.is_none()
+ }
+}
+
+impl<St, Fut, F> Future for All<St, Fut, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ type Output = bool;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
+ let mut this = self.project();
+ Poll::Ready(loop {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
+ // we're currently processing a future to produce a new accum value
+ let acc = this.accum.unwrap() && ready!(fut.poll(cx));
+ if !acc {
+ break false;
+ } // early exit
+ *this.accum = Some(acc);
+ this.future.set(None);
+ } else if this.accum.is_some() {
+ // we're waiting on a new item from the stream
+ match ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(item) => {
+ this.future.set(Some((this.f)(item)));
+ }
+ None => {
+ break this.accum.take().unwrap();
+ }
+ }
+ } else {
+ panic!("All polled after completion")
+ }
+ })
+ }
+}
diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs
new file mode 100644
index 0000000..f023125
--- /dev/null
+++ b/src/stream/stream/any.rs
@@ -0,0 +1,92 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::stream::Stream;
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`any`](super::StreamExt::any) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Any<St, Fut, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ accum: Option<bool>,
+ #[pin]
+ future: Option<Fut>,
+ }
+}
+
+impl<St, Fut, F> fmt::Debug for Any<St, Fut, F>
+where
+ St: fmt::Debug,
+ Fut: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Any")
+ .field("stream", &self.stream)
+ .field("accum", &self.accum)
+ .field("future", &self.future)
+ .finish()
+ }
+}
+
+impl<St, Fut, F> Any<St, Fut, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f, accum: Some(false), future: None }
+ }
+}
+
+impl<St, Fut, F> FusedFuture for Any<St, Fut, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ fn is_terminated(&self) -> bool {
+ self.accum.is_none() && self.future.is_none()
+ }
+}
+
+impl<St, Fut, F> Future for Any<St, Fut, F>
+where
+ St: Stream,
+ F: FnMut(St::Item) -> Fut,
+ Fut: Future<Output = bool>,
+{
+ type Output = bool;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
+ let mut this = self.project();
+ Poll::Ready(loop {
+ if let Some(fut) = this.future.as_mut().as_pin_mut() {
+ // we're currently processing a future to produce a new accum value
+ let acc = this.accum.unwrap() || ready!(fut.poll(cx));
+ if acc {
+ break true;
+ } // early exit
+ *this.accum = Some(acc);
+ this.future.set(None);
+ } else if this.accum.is_some() {
+ // we're waiting on a new item from the stream
+ match ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(item) => {
+ this.future.set(Some((this.f)(item)));
+ }
+ None => {
+ break this.accum.take().unwrap();
+ }
+ }
+ } else {
+ panic!("Any polled after completion")
+ }
+ })
+ }
+}
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index 9089e6e..b3b0155 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -70,6 +70,14 @@ mod fold;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::fold::Fold;
+mod any;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::any::Any;
+
+mod all;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::all::All;
+
#[cfg(feature = "sink")]
mod forward;
@@ -169,35 +177,41 @@ mod scan;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::scan::Scan;
-cfg_target_has_atomic! {
- #[cfg(feature = "alloc")]
- mod buffer_unordered;
- #[cfg(feature = "alloc")]
- #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
- pub use self::buffer_unordered::BufferUnordered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod buffer_unordered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::buffer_unordered::BufferUnordered;
- #[cfg(feature = "alloc")]
- mod buffered;
- #[cfg(feature = "alloc")]
- #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
- pub use self::buffered::Buffered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod buffered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::buffered::Buffered;
- #[cfg(feature = "alloc")]
- mod for_each_concurrent;
- #[cfg(feature = "alloc")]
- #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
- pub use self::for_each_concurrent::ForEachConcurrent;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod for_each_concurrent;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::for_each_concurrent::ForEachConcurrent;
- #[cfg(feature = "sink")]
- #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
- #[cfg(feature = "alloc")]
- mod split;
- #[cfg(feature = "sink")]
- #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
- #[cfg(feature = "alloc")]
- #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
- pub use self::split::{SplitStream, SplitSink, ReuniteError};
-}
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
+#[cfg(feature = "alloc")]
+mod split;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::split::{ReuniteError, SplitSink, SplitStream};
#[cfg(feature = "std")]
mod catch_unwind;
@@ -621,6 +635,50 @@ pub trait StreamExt: Stream {
assert_future::<T, _>(Fold::new(self, f, init))
}
+ /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let number_stream = stream::iter(0..10);
+ /// let contain_three = number_stream.any(|i| async move { i == 3 });
+ /// assert_eq!(contain_three.await, true);
+ /// # });
+ /// ```
+ fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
+ where
+ F: FnMut(Self::Item) -> Fut,
+ Fut: Future<Output = bool>,
+ Self: Sized,
+ {
+ assert_future::<bool, _>(Any::new(self, f))
+ }
+
+ /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let number_stream = stream::iter(0..10);
+ /// let less_then_twenty = number_stream.all(|i| async move { i < 20 });
+ /// assert_eq!(less_then_twenty.await, true);
+ /// # });
+ /// ```
+ fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
+ where
+ F: FnMut(Self::Item) -> Fut,
+ Fut: Future<Output = bool>,
+ Self: Sized,
+ {
+ assert_future::<bool, _>(All::new(self, f))
+ }
+
/// Flattens a stream of streams into just one continuous stream.
///
/// # Examples
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 11cd9c0..455ddca 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -12,6 +12,8 @@ use crate::fns::{
use crate::future::assert_future;
use crate::stream::assert_stream;
use crate::stream::{Inspect, Map};
+#[cfg(feature = "alloc")]
+use alloc::vec::Vec;
use core::pin::Pin;
use futures_core::{
future::{Future, TryFuture},
@@ -94,6 +96,12 @@ mod try_concat;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_concat::TryConcat;
+#[cfg(feature = "alloc")]
+mod try_chunks;
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_chunks::{TryChunks, TryChunksError};
+
mod try_fold;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_fold::TryFold;
@@ -110,25 +118,29 @@ mod try_take_while;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_take_while::TryTakeWhile;
-cfg_target_has_atomic! {
- #[cfg(feature = "alloc")]
- mod try_buffer_unordered;
- #[cfg(feature = "alloc")]
- #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
- pub use self::try_buffer_unordered::TryBufferUnordered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod try_buffer_unordered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_buffer_unordered::TryBufferUnordered;
- #[cfg(feature = "alloc")]
- mod try_buffered;
- #[cfg(feature = "alloc")]
- #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
- pub use self::try_buffered::TryBuffered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod try_buffered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_buffered::TryBuffered;
- #[cfg(feature = "alloc")]
- mod try_for_each_concurrent;
- #[cfg(feature = "alloc")]
- #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
- pub use self::try_for_each_concurrent::TryForEachConcurrent;
-}
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod try_for_each_concurrent;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_for_each_concurrent::TryForEachConcurrent;
#[cfg(feature = "io")]
#[cfg(feature = "std")]
@@ -572,6 +584,53 @@ pub trait TryStreamExt: TryStream {
assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
}
+ /// An adaptor for chunking up successful items of the stream inside a vector.
+ ///
+ /// This combinator will attempt to pull successful items from this stream and buffer
+ /// them into a local vector. At most `capacity` items will get buffered
+ /// before they're yielded from the returned stream.
+ ///
+ /// Note that the vectors returned from this iterator may not always have
+ /// `capacity` elements. If the underlying stream ended and only a partial
+ /// vector was created, it'll be returned. Additionally if an error happens
+ /// from the underlying stream then the currently buffered items will be
+ /// yielded.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// This function is similar to
+ /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
+ /// early if an error occurs.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, TryChunksError, TryStreamExt};
+ ///
+ /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
+ /// let mut stream = stream.try_chunks(2);
+ ///
+ /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
+ /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
+ /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
+ /// # })
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `capacity` is zero.
+ #[cfg(feature = "alloc")]
+ fn try_chunks(self, capacity: usize) -> TryChunks<Self>
+ where
+ Self: Sized,
+ {
+ assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
+ TryChunks::new(self, capacity),
+ )
+ }
+
/// Attempt to filter the values produced by this stream according to the
/// provided asynchronous closure.
///
diff --git a/src/stream/try_stream/try_chunks.rs b/src/stream/try_stream/try_chunks.rs
new file mode 100644
index 0000000..07d4425
--- /dev/null
+++ b/src/stream/try_stream/try_chunks.rs
@@ -0,0 +1,131 @@
+use crate::stream::{Fuse, IntoStream, StreamExt};
+
+use alloc::vec::Vec;
+use core::pin::Pin;
+use core::{fmt, mem};
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryChunks<St: TryStream> {
+ #[pin]
+ stream: Fuse<IntoStream<St>>,
+ items: Vec<St::Ok>,
+ cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+ }
+}
+
+impl<St: TryStream> TryChunks<St> {
+ pub(super) fn new(stream: St, capacity: usize) -> Self {
+ assert!(capacity > 0);
+
+ Self {
+ stream: IntoStream::new(stream).fuse(),
+ items: Vec::with_capacity(capacity),
+ cap: capacity,
+ }
+ }
+
+ fn take(self: Pin<&mut Self>) -> Vec<St::Ok> {
+ let cap = self.cap;
+ mem::replace(self.project().items, Vec::with_capacity(cap))
+ }
+
+ delegate_access_inner!(stream, St, (. .));
+}
+
+impl<St: TryStream> Stream for TryChunks<St> {
+ #[allow(clippy::type_complexity)]
+ type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.as_mut().project();
+ loop {
+ match ready!(this.stream.as_mut().try_poll_next(cx)) {
+ // Push the item into the buffer and check whether it is full.
+ // If so, replace our buffer with a new and empty one and return
+ // the full one.
+ Some(item) => match item {
+ Ok(item) => {
+ this.items.push(item);
+ if this.items.len() >= *this.cap {
+ return Poll::Ready(Some(Ok(self.take())));
+ }
+ }
+ Err(e) => {
+ return Poll::Ready(Some(Err(TryChunksError(self.take(), e))));
+ }
+ },
+
+ // Since the underlying stream ran out of values, return what we
+ // have buffered, if we have anything.
+ None => {
+ let last = if this.items.is_empty() {
+ None
+ } else {
+ let full_buf = mem::replace(this.items, Vec::new());
+ Some(full_buf)
+ };
+
+ return Poll::Ready(last.map(Ok));
+ }
+ }
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let (lower, upper) = self.stream.size_hint();
+ let lower = lower.saturating_add(chunk_len);
+ let upper = match upper {
+ Some(x) => x.checked_add(chunk_len),
+ None => None,
+ };
+ (lower, upper)
+ }
+}
+
+impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> {
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated() && self.items.is_empty()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Item> Sink<Item> for TryChunks<S>
+where
+ S: TryStream + Sink<Item>,
+{
+ type Error = <S as Sink<Item>>::Error;
+
+ delegate_sink!(stream, Item);
+}
+
+/// Error indicating, that while chunk was collected inner stream produced an error.
+///
+/// Contains all items that were collected before an error occurred, and the stream error itself.
+#[derive(PartialEq, Eq)]
+pub struct TryChunksError<T, E>(pub Vec<T>, pub E);
+
+impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.1.fmt(f)
+ }
+}
+
+impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.1.fmt(f)
+ }
+}
+
+#[cfg(feature = "std")]
+impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {}
diff --git a/src/task/mod.rs b/src/task/mod.rs
index c4afe30..eff6d48 100644
--- a/src/task/mod.rs
+++ b/src/task/mod.rs
@@ -19,18 +19,20 @@ pub use futures_task::noop_waker;
#[cfg(feature = "std")]
pub use futures_task::noop_waker_ref;
-cfg_target_has_atomic! {
- #[cfg(feature = "alloc")]
- pub use futures_task::ArcWake;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use futures_task::ArcWake;
- #[cfg(feature = "alloc")]
- pub use futures_task::waker;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use futures_task::waker;
- #[cfg(feature = "alloc")]
- pub use futures_task::{waker_ref, WakerRef};
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use futures_task::{waker_ref, WakerRef};
- pub use futures_core::task::__internal::AtomicWaker;
-}
+#[cfg(not(futures_no_atomic_cas))]
+pub use futures_core::task::__internal::AtomicWaker;
mod spawn;
pub use self::spawn::{LocalSpawnExt, SpawnExt};