diff options
author | Joel Galenson <jgalenson@google.com> | 2021-08-09 10:32:56 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-08-09 10:32:56 -0700 |
commit | 41d5e42e28697b0d96aeaf171e603c1c9c606bd9 (patch) | |
tree | f953b1049d550955e09fc83d2806cf23c73cd3fe | |
parent | 590d610b243cead0fb895039731f4312f9903db1 (diff) | |
download | futures-util-41d5e42e28697b0d96aeaf171e603c1c9c606bd9.tar.gz |
Upgrade rust/crates/futures-util to 0.3.16
Test: make
Change-Id: I2b7f571745922fbf3e3eba05c872ddadbfba5dd7
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 30 | ||||
-rw-r--r-- | Cargo.toml | 23 | ||||
-rw-r--r-- | Cargo.toml.orig | 14 | ||||
-rw-r--r-- | METADATA | 8 | ||||
-rw-r--r-- | build.rs | 2 | ||||
-rw-r--r-- | no_atomic_cas.rs | 2 | ||||
-rw-r--r-- | src/future/mod.rs | 17 | ||||
-rw-r--r-- | src/io/fill_buf.rs | 2 | ||||
-rw-r--r-- | src/io/into_sink.rs | 1 | ||||
-rw-r--r-- | src/io/write_all_vectored.rs | 8 | ||||
-rw-r--r-- | src/lib.rs | 33 | ||||
-rw-r--r-- | src/lock/mod.rs | 35 | ||||
-rw-r--r-- | src/lock/mutex.rs | 1 | ||||
-rw-r--r-- | src/sink/buffer.rs | 2 | ||||
-rw-r--r-- | src/stream/futures_unordered/mod.rs | 2 | ||||
-rw-r--r-- | src/stream/mod.rs | 62 | ||||
-rw-r--r-- | src/stream/select.rs | 79 | ||||
-rw-r--r-- | src/stream/select_with_strategy.rs | 229 | ||||
-rw-r--r-- | src/stream/stream/all.rs | 92 | ||||
-rw-r--r-- | src/stream/stream/any.rs | 92 | ||||
-rw-r--r-- | src/stream/stream/mod.rs | 110 | ||||
-rw-r--r-- | src/stream/try_stream/mod.rs | 93 | ||||
-rw-r--r-- | src/stream/try_stream/try_chunks.rs | 131 | ||||
-rw-r--r-- | src/task/mod.rs | 20 |
25 files changed, 878 insertions, 212 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index ec6442e..99dc8b0 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "fc080d153bc7bf00429ec5e2b91e2f21f2243846" + "sha1": "ab38fd29d3f84f8fc028fa7883e53dba423da0ee" } } @@ -38,7 +38,7 @@ license { } rust_defaults { - name: "futures-util_defaults", + name: "futures-util_test_defaults", crate_name: "futures_util", srcs: ["src/lib.rs"], test_suites: ["general-tests"], @@ -84,7 +84,7 @@ rust_defaults { rust_test_host { name: "futures-util_host_test_src_lib", - defaults: ["futures-util_defaults"], + defaults: ["futures-util_test_defaults"], test_options: { unit_test: true, }, @@ -92,7 +92,7 @@ rust_test_host { rust_test { name: "futures-util_device_test_src_lib", - defaults: ["futures-util_defaults"], + defaults: ["futures-util_test_defaults"], } rust_library { @@ -150,21 +150,21 @@ rust_library { // bytes-0.4.12 // cfg-if-0.1.10 // cfg-if-1.0.0 -// crossbeam-deque-0.7.3 +// crossbeam-deque-0.7.4 // crossbeam-epoch-0.8.2 "default,lazy_static,std" // crossbeam-queue-0.2.3 "default,std" // crossbeam-utils-0.7.2 "default,lazy_static,std" // fnv-1.0.7 "default,std" // futures-0.1.31 "default,use_std,with-deprecated" -// futures-channel-0.3.15 "alloc,std" -// futures-core-0.3.15 "alloc,std" -// futures-io-0.3.15 "std" -// futures-macro-0.3.15 -// futures-sink-0.3.15 -// futures-task-0.3.15 "alloc,std" +// futures-channel-0.3.16 "alloc,std" +// futures-core-0.3.16 "alloc,std" +// futures-io-0.3.16 "std" +// futures-macro-0.3.16 +// futures-sink-0.3.16 +// futures-task-0.3.16 "alloc,std" // iovec-0.1.4 // lazy_static-1.4.0 -// libc-0.2.94 "default,std" +// libc-0.2.98 "default,std" // lock_api-0.3.4 // log-0.4.14 // maybe-uninit-2.0.0 @@ -176,19 +176,19 @@ rust_library { // num_cpus-1.13.0 // parking_lot-0.9.0 "default" // parking_lot_core-0.6.2 -// pin-project-lite-0.2.6 +// pin-project-lite-0.2.7 // pin-utils-0.1.0 // proc-macro-hack-0.5.19 // proc-macro-nested-0.1.7 -// proc-macro2-1.0.26 "default,proc-macro" +// proc-macro2-1.0.28 "default,proc-macro" // quote-1.0.9 "default,proc-macro" // rustc_version-0.2.3 // scopeguard-1.1.0 // semver-0.9.0 "default" // semver-parser-0.7.0 -// slab-0.4.3 "default,std" +// slab-0.4.4 "default,std" // smallvec-0.6.14 "default,std" -// syn-1.0.72 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote" +// syn-1.0.74 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote" // tokio-0.1.22 "bytes,codec,default,fs,io,mio,num_cpus,reactor,rt-full,sync,tcp,timer,tokio-codec,tokio-current-thread,tokio-executor,tokio-fs,tokio-io,tokio-reactor,tokio-sync,tokio-tcp,tokio-threadpool,tokio-timer,tokio-udp,tokio-uds,udp,uds" // tokio-codec-0.1.2 // tokio-current-thread-0.1.7 @@ -3,17 +3,16 @@ # When uploading crates to the registry Cargo will automatically # "normalize" Cargo.toml files for maximal compatibility # with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies +# to registry (e.g., crates.io) dependencies. # -# If you believe there's an error in this file please file an -# issue against the rust-lang/cargo repository. If you're -# editing this file be aware that the upstream Cargo.toml -# will likely look very different (and much more reasonable) +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. [package] edition = "2018" name = "futures-util" -version = "0.3.15" +version = "0.3.16" authors = ["Alex Crichton <alex@alexcrichton.com>"] description = "Common utilities and extension traits for the futures-rs library.\n" homepage = "https://rust-lang.github.io/futures-rs" @@ -24,33 +23,33 @@ repository = "https://github.com/rust-lang/futures-rs" all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies.futures-channel] -version = "0.3.15" +version = "0.3.16" features = ["std"] optional = true default-features = false [dependencies.futures-core] -version = "0.3.15" +version = "0.3.16" default-features = false [dependencies.futures-io] -version = "0.3.15" +version = "0.3.16" features = ["std"] optional = true default-features = false [dependencies.futures-macro] -version = "=0.3.15" +version = "=0.3.16" optional = true default-features = false [dependencies.futures-sink] -version = "0.3.15" +version = "0.3.16" optional = true default-features = false [dependencies.futures-task] -version = "0.3.15" +version = "0.3.16" default-features = false [dependencies.futures_01] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index de77fda..98cace6 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,7 +1,7 @@ [package] name = "futures-util" edition = "2018" -version = "0.3.15" +version = "0.3.16" authors = ["Alex Crichton <alex@alexcrichton.com>"] license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" @@ -39,12 +39,12 @@ cfg-target-has-atomic = [] autocfg = "1" [dependencies] -futures-core = { path = "../futures-core", version = "0.3.15", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.15", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.15", default-features = false, features = ["std"], optional = true } -futures-io = { path = "../futures-io", version = "0.3.15", default-features = false, features = ["std"], optional = true } -futures-sink = { path = "../futures-sink", version = "0.3.15", default-features = false, optional = true } -futures-macro = { path = "../futures-macro", version = "=0.3.15", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.16", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.16", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.16", default-features = false, features = ["std"], optional = true } +futures-io = { path = "../futures-io", version = "0.3.16", default-features = false, features = ["std"], optional = true } +futures-sink = { path = "../futures-sink", version = "0.3.16", default-features = false, optional = true } +futures-macro = { path = "../futures-macro", version = "=0.3.16", default-features = false, optional = true } proc-macro-hack = { version = "0.5.19", optional = true } proc-macro-nested = { version = "0.1.2", optional = true } slab = { version = "0.4.2", optional = true } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-util/futures-util-0.3.15.crate" + value: "https://static.crates.io/crates/futures-util/futures-util-0.3.16.crate" } - version: "0.3.15" + version: "0.3.16" license_type: NOTICE last_upgrade_date { year: 2021 - month: 5 - day: 19 + month: 8 + day: 9 } } @@ -9,7 +9,7 @@ include!("no_atomic_cas.rs"); // and outside of the normal semver guarantees: // // - `futures_no_atomic_cas` -// Assume the target does not have atomic CAS (compare-and-swap). +// Assume the target does *not* support atomic CAS operations. // This is usually detected automatically by the build script, but you may // need to enable it manually when building for custom targets or using // non-cargo build systems that don't run the build script. diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs index 0819af1..4708bf8 100644 --- a/no_atomic_cas.rs +++ b/no_atomic_cas.rs @@ -3,6 +3,8 @@ const NO_ATOMIC_CAS_TARGETS: &[&str] = &[ "avr-unknown-gnu-atmega328", + "bpfeb-unknown-none", + "bpfel-unknown-none", "msp430-none-elf", "riscv32i-unknown-none-elf", "riscv32imc-unknown-none-elf", diff --git a/src/future/mod.rs b/src/future/mod.rs index 7a63e5f..cd08264 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -108,14 +108,15 @@ pub use self::select_ok::{select_ok, SelectOk}; mod either; pub use self::either::Either; -cfg_target_has_atomic! { - #[cfg(feature = "alloc")] - mod abortable; - #[cfg(feature = "alloc")] - pub use crate::abortable::{Abortable, AbortHandle, AbortRegistration, Aborted}; - #[cfg(feature = "alloc")] - pub use abortable::abortable; -} +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod abortable; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted}; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use abortable::abortable; // Just a helper function to ensure the futures we're returning all have the // right implementations. diff --git a/src/io/fill_buf.rs b/src/io/fill_buf.rs index 19b0d20..a1484c0 100644 --- a/src/io/fill_buf.rs +++ b/src/io/fill_buf.rs @@ -30,7 +30,7 @@ where let reader = this.reader.take().expect("Polled FillBuf after completion"); match Pin::new(&mut *reader).poll_fill_buf(cx) { - // With polinius it is possible to remove this inner match and just have the correct + // With polonius it is possible to remove this inner match and just have the correct // lifetime of the reference inferred based on which branch is taken Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) { Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)), diff --git a/src/io/into_sink.rs b/src/io/into_sink.rs index 384b8e3..6a41ee2 100644 --- a/src/io/into_sink.rs +++ b/src/io/into_sink.rs @@ -62,7 +62,6 @@ impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> { Poll::Ready(Ok(())) } - #[allow(clippy::debug_assert_with_mut_call)] fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { debug_assert!(self.buffer.is_none()); *self.project().buffer = Some(Block { offset: 0, bytes: item }); diff --git a/src/io/write_all_vectored.rs b/src/io/write_all_vectored.rs index f465209..a8fc4c6 100644 --- a/src/io/write_all_vectored.rs +++ b/src/io/write_all_vectored.rs @@ -4,7 +4,6 @@ use futures_core::task::{Context, Poll}; use futures_io::AsyncWrite; use futures_io::IoSlice; use std::io; -use std::mem; use std::pin::Pin; /// Future for the @@ -19,8 +18,9 @@ pub struct WriteAllVectored<'a, W: ?Sized + Unpin> { impl<W: ?Sized + Unpin> Unpin for WriteAllVectored<'_, W> {} impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> { - pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self { - Self { writer, bufs: IoSlice::advance(bufs, 0) } + pub(super) fn new(writer: &'a mut W, mut bufs: &'a mut [IoSlice<'a>]) -> Self { + IoSlice::advance_slices(&mut bufs, 0); + Self { writer, bufs } } } @@ -34,7 +34,7 @@ impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAllVectored<'_, W> { if n == 0 { return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); } else { - this.bufs = IoSlice::advance(mem::take(&mut this.bufs), n); + IoSlice::advance_slices(&mut this.bufs, n); } } @@ -4,11 +4,20 @@ #![cfg_attr(feature = "read-initializer", feature(read_initializer))] #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] #![cfg_attr(not(feature = "std"), no_std)] -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] -// It cannot be included in the published code because this lints have false positives in the minimum required version. -#![cfg_attr(test, warn(single_use_lifetimes))] -#![warn(clippy::all)] -#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + single_use_lifetimes, + unreachable_pub +)] +#![doc(test( + no_crate_inject, + attr( + deny(warnings, rust_2018_idioms, single_use_lifetimes), + allow(dead_code, unused_assignments, unused_variables) + ) +))] #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(all(feature = "bilock", not(feature = "unstable")))] @@ -47,13 +56,6 @@ pub mod __private { } } -macro_rules! cfg_target_has_atomic { - ($($item:item)*) => {$( - #[cfg(not(futures_no_atomic_cas))] - $item - )*}; -} - #[cfg(feature = "sink")] macro_rules! delegate_sink { ($field:ident, $item:ty) => { @@ -336,10 +338,9 @@ pub use crate::io::{ #[cfg(feature = "alloc")] pub mod lock; -cfg_target_has_atomic! { - #[cfg(feature = "alloc")] - mod abortable; -} +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod abortable; mod fns; mod unfold_state; diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 071eef6..cf374c0 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -3,20 +3,23 @@ //! This module is only available when the `std` or `alloc` feature of this //! library is activated, and it is activated by default. -cfg_target_has_atomic! { - #[cfg(feature = "std")] - mod mutex; - #[cfg(feature = "std")] - pub use self::mutex::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard}; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "std")] +mod mutex; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "std")] +pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture}; - #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] - #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] - #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] - mod bilock; - #[cfg(feature = "bilock")] - #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] - pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; - #[cfg(any(feature = "sink", feature = "io"))] - #[cfg(not(feature = "bilock"))] - pub(crate) use self::bilock::BiLock; -} +#[cfg(not(futures_no_atomic_cas))] +#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] +#[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] +mod bilock; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(any(feature = "sink", feature = "io"))] +#[cfg(not(feature = "bilock"))] +pub(crate) use self::bilock::BiLock; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "bilock")] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] +pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; diff --git a/src/lock/mutex.rs b/src/lock/mutex.rs index a849aee..85dcb15 100644 --- a/src/lock/mutex.rs +++ b/src/lock/mutex.rs @@ -66,7 +66,6 @@ impl Waiter { } } -#[allow(clippy::identity_op)] // https://github.com/rust-lang/rust-clippy/issues/3445 const IS_LOCKED: usize = 1 << 0; const HAS_WAITERS: usize = 1 << 1; diff --git a/src/sink/buffer.rs b/src/sink/buffer.rs index c6ea548..4aa6c36 100644 --- a/src/sink/buffer.rs +++ b/src/sink/buffer.rs @@ -91,14 +91,12 @@ impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> { } } - #[allow(clippy::debug_assert_with_mut_call)] fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.as_mut().try_empty_buffer(cx))?; debug_assert!(self.buf.is_empty()); self.project().sink.poll_flush(cx) } - #[allow(clippy::debug_assert_with_mut_call)] fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.as_mut().try_empty_buffer(cx))?; debug_assert!(self.buf.is_empty()); diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index a25fbe0..4a05d88 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs @@ -97,7 +97,7 @@ impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> { // Each task is wrapped in an `Arc` and thereby atomically reference counted. // Also, each task contains an `AtomicBool` which acts as a flag that indicates // whether the task is currently inserted in the atomic queue. When a wake-up -// notifiaction is received, the task will only be inserted into the ready to +// notification is received, the task will only be inserted into the ready to // run queue if it isn't inserted already. impl<Fut> Default for FuturesUnordered<Fut> { diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 0b2fc90..8cf9f80 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -62,6 +62,9 @@ pub use self::try_stream::IntoAsyncRead; #[cfg(feature = "alloc")] pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent}; +#[cfg(feature = "alloc")] +pub use self::try_stream::{TryChunks, TryChunksError}; + // Primitive streams mod iter; @@ -88,33 +91,44 @@ pub use self::poll_fn::{poll_fn, PollFn}; mod select; pub use self::select::{select, Select}; +mod select_with_strategy; +pub use self::select_with_strategy::{select_with_strategy, PollNext, SelectWithStrategy}; + mod unfold; pub use self::unfold::{unfold, Unfold}; -cfg_target_has_atomic! { - #[cfg(feature = "alloc")] - mod futures_ordered; - #[cfg(feature = "alloc")] - pub use self::futures_ordered::FuturesOrdered; - - #[cfg(feature = "alloc")] - pub mod futures_unordered; - #[cfg(feature = "alloc")] - #[doc(inline)] - pub use self::futures_unordered::FuturesUnordered; - - #[cfg(feature = "alloc")] - pub mod select_all; - #[cfg(feature = "alloc")] - pub use self::select_all::{select_all, SelectAll}; - - #[cfg(feature = "alloc")] - mod abortable; - #[cfg(feature = "alloc")] - pub use crate::abortable::{Abortable, AbortHandle, AbortRegistration, Aborted}; - #[cfg(feature = "alloc")] - pub use abortable::abortable; -} +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod futures_ordered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use self::futures_ordered::FuturesOrdered; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub mod futures_unordered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[doc(inline)] +pub use self::futures_unordered::FuturesUnordered; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub mod select_all; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[doc(inline)] +pub use self::select_all::{select_all, SelectAll}; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod abortable; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted}; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use abortable::abortable; // Just a helper function to ensure the streams we're returning all have the // right implementations. diff --git a/src/stream/select.rs b/src/stream/select.rs index 133ac6c..0c1e3af 100644 --- a/src/stream/select.rs +++ b/src/stream/select.rs @@ -1,5 +1,5 @@ use super::assert_stream; -use crate::stream::{Fuse, StreamExt}; +use crate::stream::{select_with_strategy, PollNext, SelectWithStrategy}; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -11,10 +11,7 @@ pin_project! { #[must_use = "streams do nothing unless polled"] pub struct Select<St1, St2> { #[pin] - stream1: Fuse<St1>, - #[pin] - stream2: Fuse<St2>, - flag: bool, + inner: SelectWithStrategy<St1, St2, fn(&mut PollNext)-> PollNext, PollNext>, } } @@ -22,21 +19,42 @@ pin_project! { /// stream will be polled in a round-robin fashion, and whenever a stream is /// ready to yield an item that item is yielded. /// -/// After one of the two input stream completes, the remaining one will be +/// After one of the two input streams completes, the remaining one will be /// polled exclusively. The returned stream completes when both input /// streams have completed. /// /// Note that this function consumes both streams and returns a wrapped /// version of them. +/// +/// ## Examples +/// +/// ```rust +/// # futures::executor::block_on(async { +/// use futures::stream::{ repeat, select, StreamExt }; +/// +/// let left = repeat(1); +/// let right = repeat(2); +/// +/// let mut out = select(left, right); +/// +/// for _ in 0..100 { +/// // We should be alternating. +/// assert_eq!(1, out.select_next_some().await); +/// assert_eq!(2, out.select_next_some().await); +/// } +/// # }); +/// ``` pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2> where St1: Stream, St2: Stream<Item = St1::Item>, { + fn round_robin(last: &mut PollNext) -> PollNext { + last.toggle() + } + assert_stream::<St1::Item, _>(Select { - stream1: stream1.fuse(), - stream2: stream2.fuse(), - flag: false, + inner: select_with_strategy(stream1, stream2, round_robin), }) } @@ -44,7 +62,7 @@ impl<St1, St2> Select<St1, St2> { /// Acquires a reference to the underlying streams that this combinator is /// pulling from. pub fn get_ref(&self) -> (&St1, &St2) { - (self.stream1.get_ref(), self.stream2.get_ref()) + self.inner.get_ref() } /// Acquires a mutable reference to the underlying streams that this @@ -53,7 +71,7 @@ impl<St1, St2> Select<St1, St2> { /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { - (self.stream1.get_mut(), self.stream2.get_mut()) + self.inner.get_mut() } /// Acquires a pinned mutable reference to the underlying streams that this @@ -63,7 +81,7 @@ impl<St1, St2> Select<St1, St2> { /// stream which may otherwise confuse this combinator. pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { let this = self.project(); - (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) + this.inner.get_pin_mut() } /// Consumes this combinator, returning the underlying streams. @@ -71,7 +89,7 @@ impl<St1, St2> Select<St1, St2> { /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> (St1, St2) { - (self.stream1.into_inner(), self.stream2.into_inner()) + self.inner.into_inner() } } @@ -81,7 +99,7 @@ where St2: Stream<Item = St1::Item>, { fn is_terminated(&self) -> bool { - self.stream1.is_terminated() && self.stream2.is_terminated() + self.inner.is_terminated() } } @@ -94,37 +112,6 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> { let this = self.project(); - if !*this.flag { - poll_inner(this.flag, this.stream1, this.stream2, cx) - } else { - poll_inner(this.flag, this.stream2, this.stream1, cx) - } - } -} - -fn poll_inner<St1, St2>( - flag: &mut bool, - a: Pin<&mut St1>, - b: Pin<&mut St2>, - cx: &mut Context<'_>, -) -> Poll<Option<St1::Item>> -where - St1: Stream, - St2: Stream<Item = St1::Item>, -{ - let a_done = match a.poll_next(cx) { - Poll::Ready(Some(item)) => { - // give the other stream a chance to go first next time - *flag = !*flag; - return Poll::Ready(Some(item)); - } - Poll::Ready(None) => true, - Poll::Pending => false, - }; - - match b.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) if a_done => Poll::Ready(None), - Poll::Ready(None) | Poll::Pending => Poll::Pending, + this.inner.poll_next(cx) } } diff --git a/src/stream/select_with_strategy.rs b/src/stream/select_with_strategy.rs new file mode 100644 index 0000000..bd86990 --- /dev/null +++ b/src/stream/select_with_strategy.rs @@ -0,0 +1,229 @@ +use super::assert_stream; +use crate::stream::{Fuse, StreamExt}; +use core::{fmt, pin::Pin}; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +/// Type to tell [`SelectWithStrategy`] which stream to poll next. +#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)] +pub enum PollNext { + /// Poll the first stream. + Left, + /// Poll the second stream. + Right, +} + +impl PollNext { + /// Toggle the value and return the old one. + pub fn toggle(&mut self) -> Self { + let old = *self; + + match self { + PollNext::Left => *self = PollNext::Right, + PollNext::Right => *self = PollNext::Left, + } + + old + } +} + +impl Default for PollNext { + fn default() -> Self { + PollNext::Left + } +} + +pin_project! { + /// Stream for the [`select_with_strategy()`] function. See function docs for details. + #[must_use = "streams do nothing unless polled"] + pub struct SelectWithStrategy<St1, St2, Clos, State> { + #[pin] + stream1: Fuse<St1>, + #[pin] + stream2: Fuse<St2>, + state: State, + clos: Clos, + } +} + +/// This function will attempt to pull items from both streams. You provide a +/// closure to tell [`SelectWithStrategy`] which stream to poll. The closure can +/// store state on `SelectWithStrategy` to which it will receive a `&mut` on every +/// invocation. This allows basing the strategy on prior choices. +/// +/// After one of the two input streams completes, the remaining one will be +/// polled exclusively. The returned stream completes when both input +/// streams have completed. +/// +/// Note that this function consumes both streams and returns a wrapped +/// version of them. +/// +/// ## Examples +/// +/// ### Priority +/// This example shows how to always prioritize the left stream. +/// +/// ```rust +/// # futures::executor::block_on(async { +/// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt }; +/// +/// let left = repeat(1); +/// let right = repeat(2); +/// +/// // We don't need any state, so let's make it an empty tuple. +/// // We must provide some type here, as there is no way for the compiler +/// // to infer it. As we don't need to capture variables, we can just +/// // use a function pointer instead of a closure. +/// fn prio_left(_: &mut ()) -> PollNext { PollNext::Left } +/// +/// let mut out = select_with_strategy(left, right, prio_left); +/// +/// for _ in 0..100 { +/// // Whenever we poll out, we will alwas get `1`. +/// assert_eq!(1, out.select_next_some().await); +/// } +/// # }); +/// ``` +/// +/// ### Round Robin +/// This example shows how to select from both streams round robin. +/// Note: this special case is provided by [`futures-util::stream::select`]. +/// +/// ```rust +/// # futures::executor::block_on(async { +/// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt }; +/// +/// let left = repeat(1); +/// let right = repeat(2); +/// +/// let rrobin = |last: &mut PollNext| last.toggle(); +/// +/// let mut out = select_with_strategy(left, right, rrobin); +/// +/// for _ in 0..100 { +/// // We should be alternating now. +/// assert_eq!(1, out.select_next_some().await); +/// assert_eq!(2, out.select_next_some().await); +/// } +/// # }); +/// ``` +pub fn select_with_strategy<St1, St2, Clos, State>( + stream1: St1, + stream2: St2, + which: Clos, +) -> SelectWithStrategy<St1, St2, Clos, State> +where + St1: Stream, + St2: Stream<Item = St1::Item>, + Clos: FnMut(&mut State) -> PollNext, + State: Default, +{ + assert_stream::<St1::Item, _>(SelectWithStrategy { + stream1: stream1.fuse(), + stream2: stream2.fuse(), + state: Default::default(), + clos: which, + }) +} + +impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { + /// Acquires a reference to the underlying streams that this combinator is + /// pulling from. + pub fn get_ref(&self) -> (&St1, &St2) { + (self.stream1.get_ref(), self.stream2.get_ref()) + } + + /// Acquires a mutable reference to the underlying streams that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { + (self.stream1.get_mut(), self.stream2.get_mut()) + } + + /// Acquires a pinned mutable reference to the underlying streams that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { + let this = self.project(); + (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) + } + + /// Consumes this combinator, returning the underlying streams. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> (St1, St2) { + (self.stream1.into_inner(), self.stream2.into_inner()) + } +} + +impl<St1, St2, Clos, State> FusedStream for SelectWithStrategy<St1, St2, Clos, State> +where + St1: Stream, + St2: Stream<Item = St1::Item>, + Clos: FnMut(&mut State) -> PollNext, +{ + fn is_terminated(&self) -> bool { + self.stream1.is_terminated() && self.stream2.is_terminated() + } +} + +impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State> +where + St1: Stream, + St2: Stream<Item = St1::Item>, + Clos: FnMut(&mut State) -> PollNext, +{ + type Item = St1::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> { + let this = self.project(); + + match (this.clos)(this.state) { + PollNext::Left => poll_inner(this.stream1, this.stream2, cx), + PollNext::Right => poll_inner(this.stream2, this.stream1, cx), + } + } +} + +fn poll_inner<St1, St2>( + a: Pin<&mut St1>, + b: Pin<&mut St2>, + cx: &mut Context<'_>, +) -> Poll<Option<St1::Item>> +where + St1: Stream, + St2: Stream<Item = St1::Item>, +{ + let a_done = match a.poll_next(cx) { + Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), + Poll::Ready(None) => true, + Poll::Pending => false, + }; + + match b.poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) if a_done => Poll::Ready(None), + Poll::Ready(None) | Poll::Pending => Poll::Pending, + } +} + +impl<St1, St2, Clos, State> fmt::Debug for SelectWithStrategy<St1, St2, Clos, State> +where + St1: fmt::Debug, + St2: fmt::Debug, + State: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SelectWithStrategy") + .field("stream1", &self.stream1) + .field("stream2", &self.stream2) + .field("state", &self.state) + .finish() + } +} diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs new file mode 100644 index 0000000..ba2baa5 --- /dev/null +++ b/src/stream/stream/all.rs @@ -0,0 +1,92 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`all`](super::StreamExt::all) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct All<St, Fut, F> { + #[pin] + stream: St, + f: F, + accum: Option<bool>, + #[pin] + future: Option<Fut>, + } +} + +impl<St, Fut, F> fmt::Debug for All<St, Fut, F> +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("All") + .field("stream", &self.stream) + .field("accum", &self.accum) + .field("future", &self.future) + .finish() + } +} + +impl<St, Fut, F> All<St, Fut, F> +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = bool>, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, accum: Some(true), future: None } + } +} + +impl<St, Fut, F> FusedFuture for All<St, Fut, F> +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = bool>, +{ + fn is_terminated(&self) -> bool { + self.accum.is_none() && self.future.is_none() + } +} + +impl<St, Fut, F> Future for All<St, Fut, F> +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = bool>, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> { + let mut this = self.project(); + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new accum value + let acc = this.accum.unwrap() && ready!(fut.poll(cx)); + if !acc { + break false; + } // early exit + *this.accum = Some(acc); + this.future.set(None); + } else if this.accum.is_some() { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => { + this.future.set(Some((this.f)(item))); + } + None => { + break this.accum.take().unwrap(); + } + } + } else { + panic!("All polled after completion") + } + }) + } +} diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs new file mode 100644 index 0000000..f023125 --- /dev/null +++ b/src/stream/stream/any.rs @@ -0,0 +1,92 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`any`](super::StreamExt::any) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Any<St, Fut, F> { + #[pin] + stream: St, + f: F, + accum: Option<bool>, + #[pin] + future: Option<Fut>, + } +} + +impl<St, Fut, F> fmt::Debug for Any<St, Fut, F> +where + St: fmt::Debug, + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Any") + .field("stream", &self.stream) + .field("accum", &self.accum) + .field("future", &self.future) + .finish() + } +} + +impl<St, Fut, F> Any<St, Fut, F> +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = bool>, +{ + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, accum: Some(false), future: None } + } +} + +impl<St, Fut, F> FusedFuture for Any<St, Fut, F> +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = bool>, +{ + fn is_terminated(&self) -> bool { + self.accum.is_none() && self.future.is_none() + } +} + +impl<St, Fut, F> Future for Any<St, Fut, F> +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = bool>, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> { + let mut this = self.project(); + Poll::Ready(loop { + if let Some(fut) = this.future.as_mut().as_pin_mut() { + // we're currently processing a future to produce a new accum value + let acc = this.accum.unwrap() || ready!(fut.poll(cx)); + if acc { + break true; + } // early exit + *this.accum = Some(acc); + this.future.set(None); + } else if this.accum.is_some() { + // we're waiting on a new item from the stream + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(item) => { + this.future.set(Some((this.f)(item))); + } + None => { + break this.accum.take().unwrap(); + } + } + } else { + panic!("Any polled after completion") + } + }) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 9089e6e..b3b0155 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -70,6 +70,14 @@ mod fold; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::fold::Fold; +mod any; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::any::Any; + +mod all; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::all::All; + #[cfg(feature = "sink")] mod forward; @@ -169,35 +177,41 @@ mod scan; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::scan::Scan; -cfg_target_has_atomic! { - #[cfg(feature = "alloc")] - mod buffer_unordered; - #[cfg(feature = "alloc")] - #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 - pub use self::buffer_unordered::BufferUnordered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod buffer_unordered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::buffer_unordered::BufferUnordered; - #[cfg(feature = "alloc")] - mod buffered; - #[cfg(feature = "alloc")] - #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 - pub use self::buffered::Buffered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod buffered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::buffered::Buffered; - #[cfg(feature = "alloc")] - mod for_each_concurrent; - #[cfg(feature = "alloc")] - #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 - pub use self::for_each_concurrent::ForEachConcurrent; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod for_each_concurrent; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::for_each_concurrent::ForEachConcurrent; - #[cfg(feature = "sink")] - #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] - #[cfg(feature = "alloc")] - mod split; - #[cfg(feature = "sink")] - #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] - #[cfg(feature = "alloc")] - #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 - pub use self::split::{SplitStream, SplitSink, ReuniteError}; -} +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] +#[cfg(feature = "alloc")] +mod split; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "sink")] +#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::split::{ReuniteError, SplitSink, SplitStream}; #[cfg(feature = "std")] mod catch_unwind; @@ -621,6 +635,50 @@ pub trait StreamExt: Stream { assert_future::<T, _>(Fold::new(self, f, init)) } + /// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let number_stream = stream::iter(0..10); + /// let contain_three = number_stream.any(|i| async move { i == 3 }); + /// assert_eq!(contain_three.await, true); + /// # }); + /// ``` + fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F> + where + F: FnMut(Self::Item) -> Fut, + Fut: Future<Output = bool>, + Self: Sized, + { + assert_future::<bool, _>(Any::new(self, f)) + } + + /// Execute predicate over asynchronous stream, and return `true` if all element in stream satisfied a predicate. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let number_stream = stream::iter(0..10); + /// let less_then_twenty = number_stream.all(|i| async move { i < 20 }); + /// assert_eq!(less_then_twenty.await, true); + /// # }); + /// ``` + fn all<Fut, F>(self, f: F) -> All<Self, Fut, F> + where + F: FnMut(Self::Item) -> Fut, + Fut: Future<Output = bool>, + Self: Sized, + { + assert_future::<bool, _>(All::new(self, f)) + } + /// Flattens a stream of streams into just one continuous stream. /// /// # Examples diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index 11cd9c0..455ddca 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -12,6 +12,8 @@ use crate::fns::{ use crate::future::assert_future; use crate::stream::assert_stream; use crate::stream::{Inspect, Map}; +#[cfg(feature = "alloc")] +use alloc::vec::Vec; use core::pin::Pin; use futures_core::{ future::{Future, TryFuture}, @@ -94,6 +96,12 @@ mod try_concat; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_concat::TryConcat; +#[cfg(feature = "alloc")] +mod try_chunks; +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_chunks::{TryChunks, TryChunksError}; + mod try_fold; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_fold::TryFold; @@ -110,25 +118,29 @@ mod try_take_while; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_take_while::TryTakeWhile; -cfg_target_has_atomic! { - #[cfg(feature = "alloc")] - mod try_buffer_unordered; - #[cfg(feature = "alloc")] - #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 - pub use self::try_buffer_unordered::TryBufferUnordered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod try_buffer_unordered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_buffer_unordered::TryBufferUnordered; - #[cfg(feature = "alloc")] - mod try_buffered; - #[cfg(feature = "alloc")] - #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 - pub use self::try_buffered::TryBuffered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod try_buffered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_buffered::TryBuffered; - #[cfg(feature = "alloc")] - mod try_for_each_concurrent; - #[cfg(feature = "alloc")] - #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 - pub use self::try_for_each_concurrent::TryForEachConcurrent; -} +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod try_for_each_concurrent; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_for_each_concurrent::TryForEachConcurrent; #[cfg(feature = "io")] #[cfg(feature = "std")] @@ -572,6 +584,53 @@ pub trait TryStreamExt: TryStream { assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self)) } + /// An adaptor for chunking up successful items of the stream inside a vector. + /// + /// This combinator will attempt to pull successful items from this stream and buffer + /// them into a local vector. At most `capacity` items will get buffered + /// before they're yielded from the returned stream. + /// + /// Note that the vectors returned from this iterator may not always have + /// `capacity` elements. If the underlying stream ended and only a partial + /// vector was created, it'll be returned. Additionally if an error happens + /// from the underlying stream then the currently buffered items will be + /// yielded. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + /// + /// This function is similar to + /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits + /// early if an error occurs. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, TryChunksError, TryStreamExt}; + /// + /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]); + /// let mut stream = stream.try_chunks(2); + /// + /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2]))); + /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4))); + /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6]))); + /// # }) + /// ``` + /// + /// # Panics + /// + /// This method will panic if `capacity` is zero. + #[cfg(feature = "alloc")] + fn try_chunks(self, capacity: usize) -> TryChunks<Self> + where + Self: Sized, + { + assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>( + TryChunks::new(self, capacity), + ) + } + /// Attempt to filter the values produced by this stream according to the /// provided asynchronous closure. /// diff --git a/src/stream/try_stream/try_chunks.rs b/src/stream/try_stream/try_chunks.rs new file mode 100644 index 0000000..07d4425 --- /dev/null +++ b/src/stream/try_stream/try_chunks.rs @@ -0,0 +1,131 @@ +use crate::stream::{Fuse, IntoStream, StreamExt}; + +use alloc::vec::Vec; +use core::pin::Pin; +use core::{fmt, mem}; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`try_chunks`](super::TryStreamExt::try_chunks) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct TryChunks<St: TryStream> { + #[pin] + stream: Fuse<IntoStream<St>>, + items: Vec<St::Ok>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl<St: TryStream> TryChunks<St> { + pub(super) fn new(stream: St, capacity: usize) -> Self { + assert!(capacity > 0); + + Self { + stream: IntoStream::new(stream).fuse(), + items: Vec::with_capacity(capacity), + cap: capacity, + } + } + + fn take(self: Pin<&mut Self>) -> Vec<St::Ok> { + let cap = self.cap; + mem::replace(self.project().items, Vec::with_capacity(cap)) + } + + delegate_access_inner!(stream, St, (. .)); +} + +impl<St: TryStream> Stream for TryChunks<St> { + #[allow(clippy::type_complexity)] + type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.as_mut().project(); + loop { + match ready!(this.stream.as_mut().try_poll_next(cx)) { + // Push the item into the buffer and check whether it is full. + // If so, replace our buffer with a new and empty one and return + // the full one. + Some(item) => match item { + Ok(item) => { + this.items.push(item); + if this.items.len() >= *this.cap { + return Poll::Ready(Some(Ok(self.take()))); + } + } + Err(e) => { + return Poll::Ready(Some(Err(TryChunksError(self.take(), e)))); + } + }, + + // Since the underlying stream ran out of values, return what we + // have buffered, if we have anything. + None => { + let last = if this.items.is_empty() { + None + } else { + let full_buf = mem::replace(this.items, Vec::new()); + Some(full_buf) + }; + + return Poll::Ready(last.map(Ok)); + } + } + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(chunk_len); + let upper = match upper { + Some(x) => x.checked_add(chunk_len), + None => None, + }; + (lower, upper) + } +} + +impl<St: TryStream + FusedStream> FusedStream for TryChunks<St> { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() && self.items.is_empty() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<S, Item> Sink<Item> for TryChunks<S> +where + S: TryStream + Sink<Item>, +{ + type Error = <S as Sink<Item>>::Error; + + delegate_sink!(stream, Item); +} + +/// Error indicating, that while chunk was collected inner stream produced an error. +/// +/// Contains all items that were collected before an error occurred, and the stream error itself. +#[derive(PartialEq, Eq)] +pub struct TryChunksError<T, E>(pub Vec<T>, pub E); + +impl<T, E: fmt::Debug> fmt::Debug for TryChunksError<T, E> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +impl<T, E: fmt::Display> fmt::Display for TryChunksError<T, E> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +#[cfg(feature = "std")] +impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryChunksError<T, E> {} diff --git a/src/task/mod.rs b/src/task/mod.rs index c4afe30..eff6d48 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -19,18 +19,20 @@ pub use futures_task::noop_waker; #[cfg(feature = "std")] pub use futures_task::noop_waker_ref; -cfg_target_has_atomic! { - #[cfg(feature = "alloc")] - pub use futures_task::ArcWake; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use futures_task::ArcWake; - #[cfg(feature = "alloc")] - pub use futures_task::waker; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use futures_task::waker; - #[cfg(feature = "alloc")] - pub use futures_task::{waker_ref, WakerRef}; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use futures_task::{waker_ref, WakerRef}; - pub use futures_core::task::__internal::AtomicWaker; -} +#[cfg(not(futures_no_atomic_cas))] +pub use futures_core::task::__internal::AtomicWaker; mod spawn; pub use self::spawn::{LocalSpawnExt, SpawnExt}; |