diff options
author | David LeGare <legare@google.com> | 2022-03-02 16:21:21 +0000 |
---|---|---|
committer | David LeGare <legare@google.com> | 2022-03-02 16:21:21 +0000 |
commit | 737dc9728807dc6e09c840fce95dc954780f4b71 (patch) | |
tree | 7b826874213015b98b9e0383a89c88be66b64b11 | |
parent | af5e0a0c20c10ba737a56482791cbf1d07db0690 (diff) | |
download | futures-util-737dc9728807dc6e09c840fce95dc954780f4b71.tar.gz |
Update futures-util to 0.3.21
Test: cd external/rust/crates && atest --host -c
Change-Id: I0c08e14c3a0fb14c4f2e497dbe777b597ebf2ee2
36 files changed, 1172 insertions, 259 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index ffd4f55..b52386f 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,6 @@ { "git": { - "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010" - } -} + "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44" + }, + "path_in_vcs": "futures-util" +}
\ No newline at end of file @@ -42,7 +42,7 @@ rust_test { host_supported: true, crate_name: "futures_util", cargo_env_compat: true, - cargo_pkg_version: "0.3.17", + cargo_pkg_version: "0.3.21", srcs: ["src/lib.rs"], test_suites: ["general-tests"], auto_gen_config: true, @@ -62,13 +62,10 @@ rust_test { "futures-sink", "io", "memchr", - "proc-macro-hack", - "proc-macro-nested", "sink", "slab", "std", ], - cfgs: ["fn_like_proc_macro"], rustlibs: [ "libfutures_channel", "libfutures_core", @@ -78,14 +75,10 @@ rust_test { "libmemchr", "libpin_project_lite", "libpin_utils", - "libproc_macro_nested", "libslab", "libtokio", ], - proc_macros: [ - "libfutures_macro", - "libproc_macro_hack", - ], + proc_macros: ["libfutures_macro"], } rust_library { @@ -93,7 +86,7 @@ rust_library { host_supported: true, crate_name: "futures_util", cargo_env_compat: true, - cargo_pkg_version: "0.3.17", + cargo_pkg_version: "0.3.21", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -108,13 +101,10 @@ rust_library { "futures-sink", "io", "memchr", - "proc-macro-hack", - "proc-macro-nested", "sink", "slab", "std", ], - cfgs: ["fn_like_proc_macro"], rustlibs: [ "libfutures_channel", "libfutures_core", @@ -124,13 +114,9 @@ rust_library { "libmemchr", "libpin_project_lite", "libpin_utils", - "libproc_macro_nested", "libslab", ], - proc_macros: [ - "libfutures_macro", - "libproc_macro_hack", - ], + proc_macros: ["libfutures_macro"], apex_available: [ "//apex_available:platform", "com.android.bluetooth", @@ -11,45 +11,51 @@ [package] edition = "2018" +rust-version = "1.45" name = "futures-util" -version = "0.3.17" -authors = ["Alex Crichton <alex@alexcrichton.com>"] -description = "Common utilities and extension traits for the futures-rs library.\n" +version = "0.3.21" +description = """ +Common utilities and extension traits for the futures-rs library. +""" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures-util/0.3" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" + [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = [ + "--cfg", + "docsrs", +] + [dependencies.futures-channel] -version = "0.3.17" +version = "0.3.21" features = ["std"] optional = true default-features = false [dependencies.futures-core] -version = "0.3.17" +version = "0.3.21" default-features = false [dependencies.futures-io] -version = "0.3.17" +version = "0.3.21" features = ["std"] optional = true default-features = false [dependencies.futures-macro] -version = "=0.3.17" +version = "=0.3.21" optional = true default-features = false [dependencies.futures-sink] -version = "0.3.17" +version = "0.3.21" optional = true default-features = false [dependencies.futures-task] -version = "0.3.17" +version = "0.3.21" default-features = false [dependencies.futures_01] @@ -67,14 +73,6 @@ version = "0.2.4" [dependencies.pin-utils] version = "0.1.0" -[dependencies.proc-macro-hack] -version = "0.5.19" -optional = true - -[dependencies.proc-macro-nested] -version = "0.1.2" -optional = true - [dependencies.slab] version = "0.4.2" optional = true @@ -82,24 +80,54 @@ optional = true [dependencies.tokio-io] version = "0.1.9" optional = true + [dev-dependencies.tokio] version = "0.1.11" -[build-dependencies.autocfg] -version = "1" [features] -alloc = ["futures-core/alloc", "futures-task/alloc"] +alloc = [ + "futures-core/alloc", + "futures-task/alloc", +] async-await = [] -async-await-macro = ["async-await", "futures-macro", "proc-macro-hack", "proc-macro-nested"] +async-await-macro = [ + "async-await", + "futures-macro", +] bilock = [] cfg-target-has-atomic = [] -channel = ["std", "futures-channel"] -compat = ["std", "futures_01"] -default = ["std", "async-await", "async-await-macro"] -io = ["std", "futures-io", "memchr"] -io-compat = ["io", "compat", "tokio-io"] -read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"] +channel = [ + "std", + "futures-channel", +] +compat = [ + "std", + "futures_01", +] +default = [ + "std", + "async-await", + "async-await-macro", +] +io = [ + "std", + "futures-io", + "memchr", +] +io-compat = [ + "io", + "compat", + "tokio-io", +] sink = ["futures-sink"] -std = ["alloc", "futures-core/std", "futures-task/std", "slab"] -unstable = ["futures-core/unstable", "futures-task/unstable"] +std = [ + "alloc", + "futures-core/std", + "futures-task/std", + "slab", +] +unstable = [ + "futures-core/unstable", + "futures-task/unstable", +] write-all-vectored = ["io"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index a8e9362..46ec854 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,12 +1,11 @@ [package] name = "futures-util" +version = "0.3.21" edition = "2018" -version = "0.3.17" -authors = ["Alex Crichton <alex@alexcrichton.com>"] +rust-version = "1.45" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures-util/0.3" description = """ Common utilities and extension traits for the futures-rs library. """ @@ -16,7 +15,7 @@ default = ["std", "async-await", "async-await-macro"] std = ["alloc", "futures-core/std", "futures-task/std", "slab"] alloc = ["futures-core/alloc", "futures-task/alloc"] async-await = [] -async-await-macro = ["async-await", "futures-macro", "proc-macro-hack", "proc-macro-nested"] +async-await-macro = ["async-await", "futures-macro"] compat = ["std", "futures_01"] io-compat = ["io", "compat", "tokio-io"] sink = ["futures-sink"] @@ -28,25 +27,19 @@ channel = ["std", "futures-channel"] # `unstable` feature as an explicit opt-in to unstable API. unstable = ["futures-core/unstable", "futures-task/unstable"] bilock = [] -read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"] write-all-vectored = ["io"] # These features are no longer used. # TODO: remove in the next major version. cfg-target-has-atomic = [] -[build-dependencies] -autocfg = "1" - [dependencies] -futures-core = { path = "../futures-core", version = "0.3.17", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.17", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["std"], optional = true } -futures-io = { path = "../futures-io", version = "0.3.17", default-features = false, features = ["std"], optional = true } -futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false, optional = true } -futures-macro = { path = "../futures-macro", version = "=0.3.17", default-features = false, optional = true } -proc-macro-hack = { version = "0.5.19", optional = true } -proc-macro-nested = { version = "0.1.2", optional = true } +futures-core = { path = "../futures-core", version = "0.3.21", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.21", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["std"], optional = true } +futures-io = { path = "../futures-io", version = "0.3.21", default-features = false, features = ["std"], optional = true } +futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false, optional = true } +futures-macro = { path = "../futures-macro", version = "=0.3.21", default-features = false, optional = true } slab = { version = "0.4.2", optional = true } memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-util/futures-util-0.3.17.crate" + value: "https://static.crates.io/crates/futures-util/futures-util-0.3.21.crate" } - version: "0.3.17" + version: "0.3.21" license_type: NOTICE last_upgrade_date { - year: 2021 - month: 9 - day: 22 + year: 2022 + month: 3 + day: 1 } } diff --git a/README.md b/README.md new file mode 100644 index 0000000..6e0aaed --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +# futures-util + +Common utilities and extension traits for the futures-rs library. + +## Usage + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +futures-util = "0.3" +``` + +The current `futures-util` requires Rust 1.45 or later. + +## License + +Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or +[MIT license](LICENSE-MIT) at your option. + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall +be dual licensed as above, without any additional terms or conditions. diff --git a/benches/flatten_unordered.rs b/benches/flatten_unordered.rs new file mode 100644 index 0000000..64d5f9a --- /dev/null +++ b/benches/flatten_unordered.rs @@ -0,0 +1,66 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future::{self, FutureExt}; +use futures::stream::{self, StreamExt}; +use futures::task::Poll; +use std::collections::VecDeque; +use std::thread; + +#[bench] +fn oneshot_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + const STREAM_ITEM_COUNT: usize = 1; + + b.iter(|| { + let mut txs = VecDeque::with_capacity(STREAM_COUNT); + let mut rxs = Vec::new(); + + for _ in 0..STREAM_COUNT { + let (tx, rx) = oneshot::channel(); + txs.push_back(tx); + rxs.push(rx); + } + + thread::spawn(move || { + let mut last = 1; + while let Some(tx) = txs.pop_front() { + let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT)); + last += STREAM_ITEM_COUNT; + } + }); + + let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| { + async { + if let Some(next) = vals.next() { + let val = next.await.unwrap(); + Some((val, vals)) + } else { + None + } + } + .boxed() + }) + .flatten_unordered(None); + + block_on(future::poll_fn(move |cx| { + let mut count = 0; + loop { + match flatten.poll_next_unpin(cx) { + Poll::Ready(None) => break, + Poll::Ready(Some(_)) => { + count += 1; + } + _ => {} + } + } + assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT); + + Poll::Ready(()) + })) + }); +} @@ -1,10 +1,3 @@ -#![warn(rust_2018_idioms, single_use_lifetimes)] - -use autocfg::AutoCfg; -use std::env; - -include!("no_atomic_cas.rs"); - // The rustc-cfg listed below are considered public API, but it is *unstable* // and outside of the normal semver guarantees: // @@ -14,10 +7,15 @@ include!("no_atomic_cas.rs"); // need to enable it manually when building for custom targets or using // non-cargo build systems that don't run the build script. // -// With the exceptions mentioned above, the rustc-cfg strings below are -// *not* public API. Please let us know by opening a GitHub issue if your build -// environment requires some way to enable these cfgs other than by executing -// our build script. +// With the exceptions mentioned above, the rustc-cfg emitted by the build +// script are *not* public API. + +#![warn(rust_2018_idioms, single_use_lifetimes)] + +use std::env; + +include!("no_atomic_cas.rs"); + fn main() { let target = match env::var("TARGET") { Ok(target) => target, @@ -35,27 +33,9 @@ fn main() { // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't // run. This is needed for compatibility with non-cargo build systems that // don't run the build script. - if NO_ATOMIC_CAS_TARGETS.contains(&&*target) { + if NO_ATOMIC_CAS.contains(&&*target) { println!("cargo:rustc-cfg=futures_no_atomic_cas"); } - let cfg = match AutoCfg::new() { - Ok(cfg) => cfg, - Err(e) => { - println!( - "cargo:warning={}: unable to determine rustc version: {}", - env!("CARGO_PKG_NAME"), - e - ); - return; - } - }; - - // Function like procedural macros in expressions patterns statements stabilized in Rust 1.45: - // https://blog.rust-lang.org/2020/07/16/Rust-1.45.0.html#stabilizing-function-like-procedural-macros-in-expressions-patterns-and-statements - if cfg.probe_rustc_version(1, 45) { - println!("cargo:rustc-cfg=fn_like_proc_macro"); - } - println!("cargo:rerun-if-changed=no_atomic_cas.rs"); } diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs index 4708bf8..9b05d4b 100644 --- a/no_atomic_cas.rs +++ b/no_atomic_cas.rs @@ -1,7 +1,7 @@ // This file is @generated by no_atomic_cas.sh. // It is not intended for manual editing. -const NO_ATOMIC_CAS_TARGETS: &[&str] = &[ +const NO_ATOMIC_CAS: &[&str] = &[ "avr-unknown-gnu-atmega328", "bpfeb-unknown-none", "bpfel-unknown-none", diff --git a/src/async_await/join_mod.rs b/src/async_await/join_mod.rs index c5cdd9b..28f3b23 100644 --- a/src/async_await/join_mod.rs +++ b/src/async_await/join_mod.rs @@ -81,12 +81,10 @@ macro_rules! document_join_macro { #[allow(unreachable_pub)] #[doc(hidden)] -#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::join_internal; #[allow(unreachable_pub)] #[doc(hidden)] -#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::try_join_internal; document_join_macro! { diff --git a/src/async_await/select_mod.rs b/src/async_await/select_mod.rs index 37e938d..1d13067 100644 --- a/src/async_await/select_mod.rs +++ b/src/async_await/select_mod.rs @@ -29,9 +29,6 @@ macro_rules! document_select_macro { /// It is also gated behind the `async-await` feature of this library, which is /// activated by default. /// - /// Note that `select!` relies on `proc-macro-hack`, and may require to set the - /// compiler's recursion limit very high, e.g. `#![recursion_limit="1024"]`. - /// /// # Examples /// /// ``` @@ -309,12 +306,10 @@ macro_rules! document_select_macro { #[cfg(feature = "std")] #[allow(unreachable_pub)] #[doc(hidden)] -#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::select_internal; #[allow(unreachable_pub)] #[doc(hidden)] -#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::select_biased_internal; document_select_macro! { diff --git a/src/async_await/stream_select_mod.rs b/src/async_await/stream_select_mod.rs index 7743406..1c8002f 100644 --- a/src/async_await/stream_select_mod.rs +++ b/src/async_await/stream_select_mod.rs @@ -3,7 +3,6 @@ #[cfg(feature = "std")] #[allow(unreachable_pub)] #[doc(hidden)] -#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::stream_select_internal; /// Combines several streams, all producing the same `Item` type, into one stream. @@ -13,10 +12,6 @@ pub use futures_macro::stream_select_internal; /// /// If multiple streams are ready, one will be pseudo randomly selected at runtime. /// -/// This macro is gated behind the `async-await` feature of this library, which is activated by default. -/// Note that `stream_select!` relies on `proc-macro-hack`, and may require to set the compiler's recursion -/// limit very high, e.g. `#![recursion_limit="1024"]`. -/// /// # Examples /// /// ``` diff --git a/src/compat/compat01as03.rs b/src/compat/compat01as03.rs index 17239a4..36de1da 100644 --- a/src/compat/compat01as03.rs +++ b/src/compat/compat01as03.rs @@ -64,6 +64,7 @@ pub trait Future01CompatExt: Future01 { /// [`Future<Output = Result<T, E>>`](futures_core::future::Future). /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514 /// # futures::executor::block_on(async { /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo /// # // feature issues @@ -90,6 +91,7 @@ pub trait Stream01CompatExt: Stream01 { /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream). /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514 /// # futures::executor::block_on(async { /// use futures::stream::StreamExt; /// use futures_util::compat::Stream01CompatExt; @@ -119,6 +121,7 @@ pub trait Sink01CompatExt: Sink01 { /// [`Sink<T, Error = E>`](futures_sink::Sink). /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514 /// # futures::executor::block_on(async { /// use futures::{sink::SinkExt, stream::StreamExt}; /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt}; @@ -351,8 +354,6 @@ unsafe impl UnsafeNotify01 for NotifyWaker { #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] mod io { use super::*; - #[cfg(feature = "read-initializer")] - use futures_io::Initializer; use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03}; use std::io::Error; use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01}; @@ -364,6 +365,7 @@ mod io { /// [`AsyncRead`](futures_io::AsyncRead). /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514 /// # futures::executor::block_on(async { /// use futures::io::AsyncReadExt; /// use futures_util::compat::AsyncRead01CompatExt; @@ -393,6 +395,7 @@ mod io { /// [`AsyncWrite`](futures_io::AsyncWrite). /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514 /// # futures::executor::block_on(async { /// use futures::io::AsyncWriteExt; /// use futures_util::compat::AsyncWrite01CompatExt; @@ -416,16 +419,6 @@ mod io { impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {} impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> { - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - // check if `prepare_uninitialized_buffer` needs zeroing - if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) { - Initializer::zeroing() - } else { - Initializer::nop() - } - } - fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/compat/compat03as01.rs b/src/compat/compat03as01.rs index 2573fe7..5d3a6e9 100644 --- a/src/compat/compat03as01.rs +++ b/src/compat/compat03as01.rs @@ -236,17 +236,7 @@ mod io { } } - impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> { - #[cfg(feature = "read-initializer")] - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - let initializer = self.inner.initializer(); - let does_init = initializer.should_initialize(); - if does_init { - initializer.initialize(buf); - } - does_init - } - } + impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {} impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> { fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { diff --git a/src/compat/executor.rs b/src/compat/executor.rs index e25705b..ea0c67a 100644 --- a/src/compat/executor.rs +++ b/src/compat/executor.rs @@ -17,6 +17,7 @@ pub trait Executor01CompatExt: Executor01<Executor01Future> + Clone + Send + 'st /// futures 0.3 [`Spawn`](futures_task::Spawn). /// /// ``` + /// # if cfg!(miri) { return; } // Miri does not support epoll /// use futures::task::SpawnExt; /// use futures::future::{FutureExt, TryFutureExt}; /// use futures_util::compat::Executor01CompatExt; diff --git a/src/future/either.rs b/src/future/either.rs index 35650da..9602de7 100644 --- a/src/future/either.rs +++ b/src/future/either.rs @@ -184,8 +184,6 @@ mod if_std { use core::pin::Pin; use core::task::{Context, Poll}; - #[cfg(feature = "read-initializer")] - use futures_io::Initializer; use futures_io::{ AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom, }; @@ -195,14 +193,6 @@ mod if_std { A: AsyncRead, B: AsyncRead, { - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - match self { - Either::Left(x) => x.initializer(), - Either::Right(x) => x.initializer(), - } - } - fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/io/allow_std.rs b/src/io/allow_std.rs index 1d13e0c..ec30ee3 100644 --- a/src/io/allow_std.rs +++ b/src/io/allow_std.rs @@ -1,6 +1,4 @@ use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom}; use std::pin::Pin; use std::{fmt, io}; @@ -121,10 +119,6 @@ where fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { self.0.read_vectored(bufs) } - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.0.initializer() - } fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> { self.0.read_to_end(buf) } @@ -155,11 +149,6 @@ where ) -> Poll<io::Result<usize>> { Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs)))) } - - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.0.initializer() - } } impl<T> io::Seek for AllowStdIo<T> diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs index 2d585a9..0334a9f 100644 --- a/src/io/buf_reader.rs +++ b/src/io/buf_reader.rs @@ -2,8 +2,6 @@ use super::DEFAULT_BUF_SIZE; use futures_core::future::Future; use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom}; use pin_project_lite::pin_project; use std::io::{self, Read}; @@ -144,12 +142,6 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> { self.consume(nread); Poll::Ready(Ok(nread)) } - - // we can't skip unconditionally because of the large buffer case in read. - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.inner.initializer() - } } impl<R: AsyncRead> AsyncBufRead for BufReader<R> { diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index f292b87..cb74863 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -6,6 +6,7 @@ use pin_project_lite::pin_project; use std::fmt; use std::io::{self, Write}; use std::pin::Pin; +use std::ptr; pin_project! { /// Wraps a writer and buffers its output. @@ -49,7 +50,7 @@ impl<W: AsyncWrite> BufWriter<W> { Self { inner, buf: Vec::with_capacity(cap), written: 0 } } - fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + pub(super) fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { let mut this = self.project(); let len = this.buf.len(); @@ -83,6 +84,68 @@ impl<W: AsyncWrite> BufWriter<W> { pub fn buffer(&self) -> &[u8] { &self.buf } + + /// Capacity of `buf`. how many chars can be held in buffer + pub(super) fn capacity(&self) -> usize { + self.buf.capacity() + } + + /// Remaining number of bytes to reach `buf` 's capacity + #[inline] + pub(super) fn spare_capacity(&self) -> usize { + self.buf.capacity() - self.buf.len() + } + + /// Write a byte slice directly into buffer + /// + /// Will truncate the number of bytes written to `spare_capacity()` so you want to + /// calculate the size of your slice to avoid losing bytes + /// + /// Based on `std::io::BufWriter` + pub(super) fn write_to_buf(self: Pin<&mut Self>, buf: &[u8]) -> usize { + let available = self.spare_capacity(); + let amt_to_buffer = available.min(buf.len()); + + // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction. + unsafe { + self.write_to_buffer_unchecked(&buf[..amt_to_buffer]); + } + + amt_to_buffer + } + + /// Write byte slice directly into `self.buf` + /// + /// Based on `std::io::BufWriter` + #[inline] + unsafe fn write_to_buffer_unchecked(self: Pin<&mut Self>, buf: &[u8]) { + debug_assert!(buf.len() <= self.spare_capacity()); + let this = self.project(); + let old_len = this.buf.len(); + let buf_len = buf.len(); + let src = buf.as_ptr(); + let dst = this.buf.as_mut_ptr().add(old_len); + ptr::copy_nonoverlapping(src, dst, buf_len); + this.buf.set_len(old_len + buf_len); + } + + /// Write directly using `inner`, bypassing buffering + pub(super) fn inner_poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.project().inner.poll_write(cx, buf) + } + + /// Write directly using `inner`, bypassing buffering + pub(super) fn inner_poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + self.project().inner.poll_write_vectored(cx, bufs) + } } impl<W: AsyncWrite> AsyncWrite for BufWriter<W> { diff --git a/src/io/chain.rs b/src/io/chain.rs index a35c50d..728a3d2 100644 --- a/src/io/chain.rs +++ b/src/io/chain.rs @@ -1,7 +1,5 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut}; use pin_project_lite::pin_project; use std::fmt; @@ -111,16 +109,6 @@ where } this.second.poll_read_vectored(cx, bufs) } - - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - let initializer = self.first.initializer(); - if initializer.should_initialize() { - initializer - } else { - self.second.initializer() - } - } } impl<T, U> AsyncBufRead for Chain<T, U> diff --git a/src/io/empty.rs b/src/io/empty.rs index ab2395a..02f6103 100644 --- a/src/io/empty.rs +++ b/src/io/empty.rs @@ -1,6 +1,4 @@ use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead}; use std::fmt; use std::io; @@ -43,12 +41,6 @@ impl AsyncRead for Empty { ) -> Poll<io::Result<usize>> { Poll::Ready(Ok(0)) } - - #[cfg(feature = "read-initializer")] - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } } impl AsyncBufRead for Empty { diff --git a/src/io/line_writer.rs b/src/io/line_writer.rs new file mode 100644 index 0000000..71cd668 --- /dev/null +++ b/src/io/line_writer.rs @@ -0,0 +1,155 @@ +use super::buf_writer::BufWriter; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncWrite; +use futures_io::IoSlice; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; + +pin_project! { +/// Wrap a writer, like [`BufWriter`] does, but prioritizes buffering lines +/// +/// This was written based on `std::io::LineWriter` which goes into further details +/// explaining the code. +/// +/// Buffering is actually done using `BufWriter`. This class will leverage `BufWriter` +/// to write on-each-line. +#[derive(Debug)] +pub struct LineWriter<W: AsyncWrite> { + #[pin] + buf_writer: BufWriter<W>, +} +} + +impl<W: AsyncWrite> LineWriter<W> { + /// Create a new `LineWriter` with default buffer capacity. The default is currently 1KB + /// which was taken from `std::io::LineWriter` + pub fn new(inner: W) -> LineWriter<W> { + LineWriter::with_capacity(1024, inner) + } + + /// Creates a new `LineWriter` with the specified buffer capacity. + pub fn with_capacity(capacity: usize, inner: W) -> LineWriter<W> { + LineWriter { buf_writer: BufWriter::with_capacity(capacity, inner) } + } + + /// Flush `buf_writer` if last char is "new line" + fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + let this = self.project(); + match this.buf_writer.buffer().last().copied() { + Some(b'\n') => this.buf_writer.flush_buf(cx), + _ => Poll::Ready(Ok(())), + } + } + + /// Returns a reference to `buf_writer`'s internally buffered data. + pub fn buffer(&self) -> &[u8] { + self.buf_writer.buffer() + } + + /// Acquires a reference to the underlying sink or stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &W { + self.buf_writer.get_ref() + } +} + +impl<W: AsyncWrite> AsyncWrite for LineWriter<W> { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + let mut this = self.as_mut().project(); + let newline_index = match memchr::memrchr(b'\n', buf) { + None => { + ready!(self.as_mut().flush_if_completed_line(cx)?); + return self.project().buf_writer.poll_write(cx, buf); + } + Some(newline_index) => newline_index + 1, + }; + + ready!(this.buf_writer.as_mut().poll_flush(cx)?); + + let lines = &buf[..newline_index]; + + let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write(cx, lines))? }; + + if flushed == 0 { + return Poll::Ready(Ok(0)); + } + + let tail = if flushed >= newline_index { + &buf[flushed..] + } else if newline_index - flushed <= this.buf_writer.capacity() { + &buf[flushed..newline_index] + } else { + let scan_area = &buf[flushed..]; + let scan_area = &scan_area[..this.buf_writer.capacity()]; + match memchr::memrchr(b'\n', scan_area) { + Some(newline_index) => &scan_area[..newline_index + 1], + None => scan_area, + } + }; + + let buffered = this.buf_writer.as_mut().write_to_buf(tail); + Poll::Ready(Ok(flushed + buffered)) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + let mut this = self.as_mut().project(); + // `is_write_vectored()` is handled in original code, but not in this crate + // see https://github.com/rust-lang/rust/issues/70436 + + let last_newline_buf_idx = bufs + .iter() + .enumerate() + .rev() + .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i)); + let last_newline_buf_idx = match last_newline_buf_idx { + None => { + ready!(self.as_mut().flush_if_completed_line(cx)?); + return self.project().buf_writer.poll_write_vectored(cx, bufs); + } + Some(i) => i, + }; + + ready!(this.buf_writer.as_mut().poll_flush(cx)?); + + let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1); + + let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write_vectored(cx, lines))? }; + if flushed == 0 { + return Poll::Ready(Ok(0)); + } + + let lines_len = lines.iter().map(|buf| buf.len()).sum(); + if flushed < lines_len { + return Poll::Ready(Ok(flushed)); + } + + let buffered: usize = tail + .iter() + .filter(|buf| !buf.is_empty()) + .map(|buf| this.buf_writer.as_mut().write_to_buf(buf)) + .take_while(|&n| n > 0) + .sum(); + + Poll::Ready(Ok(flushed + buffered)) + } + + /// Forward to `buf_writer` 's `BufWriter::poll_flush()` + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.as_mut().project().buf_writer.poll_flush(cx) + } + + /// Forward to `buf_writer` 's `BufWriter::poll_close()` + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.as_mut().project().buf_writer.poll_close(cx) + } +} diff --git a/src/io/mod.rs b/src/io/mod.rs index 16cf5a7..4dd2e02 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -26,10 +26,6 @@ use std::{pin::Pin, ptr}; // Re-export some types from `std::io` so that users don't have to deal // with conflicts when `use`ing `futures::io` and `std::io`. #[doc(no_inline)] -#[cfg(feature = "read-initializer")] -#[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))] -pub use std::io::Initializer; -#[doc(no_inline)] pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; @@ -40,15 +36,9 @@ const DEFAULT_BUF_SIZE: usize = 8 * 1024; /// Initializes a buffer if necessary. /// -/// A buffer is always initialized if `read-initializer` feature is disabled. +/// A buffer is currently always initialized. #[inline] unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) { - #[cfg(feature = "read-initializer")] - { - if !_reader.initializer().should_initialize() { - return; - } - } ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) } @@ -61,6 +51,9 @@ pub use self::buf_reader::{BufReader, SeeKRelative}; mod buf_writer; pub use self::buf_writer::BufWriter; +mod line_writer; +pub use self::line_writer::LineWriter; + mod chain; pub use self::chain::Chain; diff --git a/src/io/repeat.rs b/src/io/repeat.rs index 4cefcb2..2828bf0 100644 --- a/src/io/repeat.rs +++ b/src/io/repeat.rs @@ -1,7 +1,5 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncRead, IoSliceMut}; use std::fmt; use std::io; @@ -59,12 +57,6 @@ impl AsyncRead for Repeat { } Poll::Ready(Ok(nwritten)) } - - #[cfg(feature = "read-initializer")] - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } } impl fmt::Debug for Repeat { diff --git a/src/io/take.rs b/src/io/take.rs index 0583020..2c49480 100644 --- a/src/io/take.rs +++ b/src/io/take.rs @@ -1,7 +1,5 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; -#[cfg(feature = "read-initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead}; use pin_project_lite::pin_project; use std::pin::Pin; @@ -100,11 +98,6 @@ impl<R: AsyncRead> AsyncRead for Take<R> { *this.limit -= n as u64; Poll::Ready(Ok(n)) } - - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - self.inner.initializer() - } } impl<R: AsyncBufRead> AsyncBufRead for Take<R> { @@ -1,7 +1,6 @@ //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s, //! and the `AsyncRead` and `AsyncWrite` traits. -#![cfg_attr(feature = "read-initializer", feature(read_initializer))] #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] #![cfg_attr(not(feature = "std"), no_std)] #![warn( @@ -23,9 +22,6 @@ #[cfg(all(feature = "bilock", not(feature = "unstable")))] compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); -#[cfg(all(feature = "read-initializer", not(feature = "unstable")))] -compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[cfg(feature = "alloc")] extern crate alloc; @@ -148,11 +144,6 @@ macro_rules! delegate_async_write { #[cfg(feature = "std")] macro_rules! delegate_async_read { ($field:ident) => { - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> $crate::io::Initializer { - self.$field.initializer() - } - fn poll_read( self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index 4a05d88..fdbd53d 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs @@ -121,8 +121,9 @@ impl<Fut> FuturesUnordered<Fut> { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Weak::new(), + woken: AtomicBool::new(false), }); - let stub_ptr = &*stub as *const Task<Fut>; + let stub_ptr = Arc::as_ptr(&stub); let ready_to_run_queue = Arc::new(ReadyToRunQueue { waker: AtomicWaker::new(), head: AtomicPtr::new(stub_ptr as *mut _), @@ -167,6 +168,7 @@ impl<Fut> FuturesUnordered<Fut> { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue), + woken: AtomicBool::new(false), }); // Reset the `is_terminated` flag if we've previously marked ourselves @@ -375,7 +377,7 @@ impl<Fut> FuturesUnordered<Fut> { // The `ReadyToRunQueue` stub is never inserted into the `head_all` // list, and its pointer value will remain valid for the lifetime of // this `FuturesUnordered`, so we can make use of its value here. - &*self.ready_to_run_queue.stub as *const _ as *mut _ + Arc::as_ptr(&self.ready_to_run_queue.stub) as *mut _ } } @@ -383,25 +385,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { type Item = Fut::Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - // Variable to determine how many times it is allowed to poll underlying - // futures without yielding. - // - // A single call to `poll_next` may potentially do a lot of work before - // yielding. This happens in particular if the underlying futures are awoken - // frequently but continue to return `Pending`. This is problematic if other - // tasks are waiting on the executor, since they do not get to run. This value - // caps the number of calls to `poll` on underlying futures a single call to - // `poll_next` is allowed to make. - // - // The value is the length of FuturesUnordered. This ensures that each - // future is polled only once at most per iteration. - // - // See also https://github.com/rust-lang/futures-rs/issues/2047. - let yield_every = self.len(); + let len = self.len(); // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; + let mut yielded = 0; // Ensure `parent` is correctly set. self.ready_to_run_queue.waker.register(cx.waker()); @@ -512,7 +501,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { // the internal allocation, appropriately accessing fields and // deallocating the task if need be. let res = { - let waker = Task::waker_ref(bomb.task.as_ref().unwrap()); + let task = bomb.task.as_ref().unwrap(); + // We are only interested in whether the future is awoken before it + // finishes polling, so reset the flag here. + task.woken.store(false, Relaxed); + let waker = Task::waker_ref(task); let mut cx = Context::from_waker(&waker); // Safety: We won't move the future ever again @@ -525,12 +518,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { match res { Poll::Pending => { let task = bomb.task.take().unwrap(); + // If the future was awoken during polling, we assume + // the future wanted to explicitly yield. + yielded += task.woken.load(Relaxed) as usize; bomb.queue.link(task); - if polled == yield_every { - // We have polled a large number of futures in a row without yielding. - // To ensure we do not starve other tasks waiting on the executor, - // we yield here, but immediately wake ourselves up to continue. + // If a future yields, we respect it and yield here. + // If all futures have been polled, we also yield here to + // avoid starving other tasks waiting on the executor. + // (polling the same future twice per iteration may cause + // the problem: https://github.com/rust-lang/futures-rs/pull/2333) + if yielded >= 2 || polled == len { cx.waker().wake_by_ref(); return Poll::Pending; } diff --git a/src/stream/futures_unordered/ready_to_run_queue.rs b/src/stream/futures_unordered/ready_to_run_queue.rs index 5ef6cde..4518705 100644 --- a/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/src/stream/futures_unordered/ready_to_run_queue.rs @@ -83,7 +83,7 @@ impl<Fut> ReadyToRunQueue<Fut> { } pub(super) fn stub(&self) -> *const Task<Fut> { - &*self.stub + Arc::as_ptr(&self.stub) } // Clear the queue of tasks. diff --git a/src/stream/futures_unordered/task.rs b/src/stream/futures_unordered/task.rs index da2cd67..ec2114e 100644 --- a/src/stream/futures_unordered/task.rs +++ b/src/stream/futures_unordered/task.rs @@ -1,6 +1,6 @@ use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; -use core::sync::atomic::Ordering::{self, SeqCst}; +use core::sync::atomic::Ordering::{self, Relaxed, SeqCst}; use core::sync::atomic::{AtomicBool, AtomicPtr}; use super::abort::abort; @@ -31,6 +31,11 @@ pub(super) struct Task<Fut> { // Whether or not this task is currently in the ready to run queue pub(super) queued: AtomicBool, + + // Whether the future was awoken during polling + // It is possible for this flag to be set to true after the polling, + // but it will be ignored. + pub(super) woken: AtomicBool, } // `Task` can be sent across threads safely because it ensures that @@ -48,6 +53,8 @@ impl<Fut> ArcWake for Task<Fut> { None => return, }; + arc_self.woken.store(true, Relaxed); + // It's our job to enqueue this task it into the ready to run queue. To // do this we set the `queued` flag, and if successful we then do the // actual queueing operation, ensuring that we're only queued once. @@ -62,7 +69,7 @@ impl<Fut> ArcWake for Task<Fut> { // still. let prev = arc_self.queued.swap(true, SeqCst); if !prev { - inner.enqueue(&**arc_self); + inner.enqueue(Arc::as_ptr(arc_self)); inner.waker.wake(); } } diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs new file mode 100644 index 0000000..513cab7 --- /dev/null +++ b/src/stream/stream/count.rs @@ -0,0 +1,53 @@ +use core::fmt; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`count`](super::StreamExt::count) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Count<St> { + #[pin] + stream: St, + count: usize + } +} + +impl<St> fmt::Debug for Count<St> +where + St: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish() + } +} + +impl<St: Stream> Count<St> { + pub(super) fn new(stream: St) -> Self { + Self { stream, count: 0 } + } +} + +impl<St: FusedStream> FusedFuture for Count<St> { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +impl<St: Stream> Future for Count<St> { + type Output = usize; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + Poll::Ready(loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(_) => *this.count += 1, + None => break *this.count, + } + }) + } +} diff --git a/src/stream/stream/flatten_unordered.rs b/src/stream/stream/flatten_unordered.rs new file mode 100644 index 0000000..07f971c --- /dev/null +++ b/src/stream/stream/flatten_unordered.rs @@ -0,0 +1,509 @@ +use alloc::sync::Arc; +use core::{ + cell::UnsafeCell, + convert::identity, + fmt, + num::NonZeroUsize, + pin::Pin, + sync::atomic::{AtomicU8, Ordering}, +}; + +use pin_project_lite::pin_project; + +use futures_core::{ + future::Future, + ready, + stream::{FusedStream, Stream}, + task::{Context, Poll, Waker}, +}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use futures_task::{waker, ArcWake}; + +use crate::stream::FuturesUnordered; + +/// There is nothing to poll and stream isn't being +/// polled or waking at the moment. +const NONE: u8 = 0; + +/// Inner streams need to be polled. +const NEED_TO_POLL_INNER_STREAMS: u8 = 1; + +/// The base stream needs to be polled. +const NEED_TO_POLL_STREAM: u8 = 0b10; + +/// It needs to poll base stream and inner streams. +const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM; + +/// The current stream is being polled at the moment. +const POLLING: u8 = 0b100; + +/// Inner streams are being woken at the moment. +const WAKING_INNER_STREAMS: u8 = 0b1000; + +/// The base stream is being woken at the moment. +const WAKING_STREAM: u8 = 0b10000; + +/// The base stream and inner streams are being woken at the moment. +const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS; + +/// The stream was waked and will be polled. +const WOKEN: u8 = 0b100000; + +/// Determines what needs to be polled, and is stream being polled at the +/// moment or not. +#[derive(Clone, Debug)] +struct SharedPollState { + state: Arc<AtomicU8>, +} + +impl SharedPollState { + /// Constructs new `SharedPollState` with the given state. + fn new(value: u8) -> SharedPollState { + SharedPollState { state: Arc::new(AtomicU8::new(value)) } + } + + /// Attempts to start polling, returning stored state in case of success. + /// Returns `None` if some waker is waking at the moment. + fn start_polling( + &self, + ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + let value = self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + if value & WAKING_ALL == NONE { + Some(POLLING) + } else { + None + } + }) + .ok()?; + let bomb = PollStateBomb::new(self, SharedPollState::reset); + + Some((value, bomb)) + } + + /// Starts the waking process and performs bitwise or with the given value. + fn start_waking( + &self, + to_poll: u8, + waking: u8, + ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + let value = self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + // Waking process for this waker already started + if value & waking != NONE { + return None; + } + let mut next_value = value | to_poll; + // Only start the waking process if we're not in the polling phase and the stream isn't woken already + if value & (WOKEN | POLLING) == NONE { + next_value |= waking; + } + + if next_value != value { + Some(next_value) + } else { + None + } + }) + .ok()?; + + if value & (WOKEN | POLLING) == NONE { + let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking)); + + Some((value, bomb)) + } else { + None + } + } + + /// Sets current state to + /// - `!POLLING` allowing to use wakers + /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called, + /// or `will_be_woken` flag supplied + /// - `!WAKING_ALL` as + /// * Wakers called during the `POLLING` phase won't propagate their calls + /// * `POLLING` phase can't start if some of the wakers are active + /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again. + fn stop_polling(&self, to_poll: u8, will_be_woken: bool) -> u8 { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| { + let mut next_value = to_poll; + + value &= NEED_TO_POLL_ALL; + if value != NONE || will_be_woken { + next_value |= WOKEN; + } + next_value |= value; + + Some(next_value & !POLLING & !WAKING_ALL) + }) + .unwrap() + } + + /// Toggles state to non-waking, allowing to start polling. + fn stop_waking(&self, waking: u8) -> u8 { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + let mut next_value = value & !waking; + // Waker will be called only if the current waking state is the same as the specified waker state + if value & WAKING_ALL == waking { + next_value |= WOKEN; + } + + if next_value != value { + Some(next_value) + } else { + None + } + }) + .unwrap_or_else(identity) + } + + /// Resets current state allowing to poll the stream and wake up wakers. + fn reset(&self) -> u8 { + self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel) + } +} + +/// Used to execute some function on the given state when dropped. +struct PollStateBomb<'a, F: FnOnce(&SharedPollState) -> u8> { + state: &'a SharedPollState, + drop: Option<F>, +} + +impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> { + /// Constructs new bomb with the given state. + fn new(state: &'a SharedPollState, drop: F) -> Self { + Self { state, drop: Some(drop) } + } + + /// Deactivates bomb, forces it to not call provided function when dropped. + fn deactivate(mut self) { + self.drop.take(); + } + + /// Manually fires the bomb, returning supplied state. + fn fire(mut self) -> Option<u8> { + self.drop.take().map(|drop| (drop)(self.state)) + } +} + +impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> { + fn drop(&mut self) { + if let Some(drop) = self.drop.take() { + (drop)(self.state); + } + } +} + +/// Will update state with the provided value on `wake_by_ref` call +/// and then, if there is a need, call `inner_waker`. +struct InnerWaker { + inner_waker: UnsafeCell<Option<Waker>>, + poll_state: SharedPollState, + need_to_poll: u8, +} + +unsafe impl Send for InnerWaker {} +unsafe impl Sync for InnerWaker {} + +impl InnerWaker { + /// Replaces given waker's inner_waker for polling stream/futures which will + /// update poll state on `wake_by_ref` call. Use only if you need several + /// contexts. + /// + /// ## Safety + /// + /// This function will modify waker's `inner_waker` via `UnsafeCell`, so + /// it should be used only during `POLLING` phase. + unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) -> Waker { + *self_arc.inner_waker.get() = cx.waker().clone().into(); + waker(self_arc.clone()) + } + + /// Attempts to start the waking process for the waker with the given value. + /// If succeeded, then the stream isn't yet woken and not being polled at the moment. + fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + self.poll_state.start_waking(self.need_to_poll, self.waking_state()) + } + + /// Returns the corresponding waking state toggled by this waker. + fn waking_state(&self) -> u8 { + self.need_to_poll << 3 + } +} + +impl ArcWake for InnerWaker { + fn wake_by_ref(self_arc: &Arc<Self>) { + if let Some((_, state_bomb)) = self_arc.start_waking() { + // Safety: now state is not `POLLING` + let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() }; + + if let Some(inner_waker) = waker_opt.clone() { + // Stop waking to allow polling stream + let poll_state_value = state_bomb.fire().unwrap(); + + // Here we want to call waker only if stream isn't woken yet and + // also to optimize the case when two wakers are called at the same time. + // + // In this case the best strategy will be to propagate only the latest waker's awake, + // and then poll both entities in a single `poll_next` call + if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() { + // Wake up inner waker + inner_waker.wake(); + } + } + } + } +} + +pin_project! { + /// Future which contains optional stream. + /// + /// If it's `Some`, it will attempt to call `poll_next` on it, + /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))` + /// or `None` in case of `Poll::Ready(None)`. + /// + /// If `poll_next` will return `Poll::Pending`, it will be forwarded to + /// the future and current task will be notified by waker. + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct PollStreamFut<St> { + #[pin] + stream: Option<St>, + } +} + +impl<St> PollStreamFut<St> { + /// Constructs new `PollStreamFut` using given `stream`. + fn new(stream: impl Into<Option<St>>) -> Self { + Self { stream: stream.into() } + } +} + +impl<St: Stream + Unpin> Future for PollStreamFut<St> { + type Output = Option<(St::Item, PollStreamFut<St>)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut stream = self.project().stream; + + let item = if let Some(stream) = stream.as_mut().as_pin_mut() { + ready!(stream.poll_next(cx)) + } else { + None + }; + let next_item_fut = PollStreamFut::new(stream.get_mut().take()); + let out = item.map(|item| (item, next_item_fut)); + + Poll::Ready(out) + } +} + +pin_project! { + /// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered) + /// method. + #[project = FlattenUnorderedProj] + #[must_use = "streams do nothing unless polled"] + pub struct FlattenUnordered<St> where St: Stream { + #[pin] + inner_streams: FuturesUnordered<PollStreamFut<St::Item>>, + #[pin] + stream: St, + poll_state: SharedPollState, + limit: Option<NonZeroUsize>, + is_stream_done: bool, + inner_streams_waker: Arc<InnerWaker>, + stream_waker: Arc<InnerWaker>, + } +} + +impl<St> fmt::Debug for FlattenUnordered<St> +where + St: Stream + fmt::Debug, + St::Item: Stream + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlattenUnordered") + .field("poll_state", &self.poll_state) + .field("inner_streams", &self.inner_streams) + .field("limit", &self.limit) + .field("stream", &self.stream) + .field("is_stream_done", &self.is_stream_done) + .finish() + } +} + +impl<St> FlattenUnordered<St> +where + St: Stream, + St::Item: Stream + Unpin, +{ + pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> { + let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM); + + FlattenUnordered { + inner_streams: FuturesUnordered::new(), + stream, + is_stream_done: false, + limit: limit.and_then(NonZeroUsize::new), + inner_streams_waker: Arc::new(InnerWaker { + inner_waker: UnsafeCell::new(None), + poll_state: poll_state.clone(), + need_to_poll: NEED_TO_POLL_INNER_STREAMS, + }), + stream_waker: Arc::new(InnerWaker { + inner_waker: UnsafeCell::new(None), + poll_state: poll_state.clone(), + need_to_poll: NEED_TO_POLL_STREAM, + }), + poll_state, + } + } + + delegate_access_inner!(stream, St, ()); +} + +impl<St> FlattenUnorderedProj<'_, St> +where + St: Stream, +{ + /// Checks if current `inner_streams` size is less than optional limit. + fn is_exceeded_limit(&self) -> bool { + self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get()) + } +} + +impl<St> FusedStream for FlattenUnordered<St> +where + St: FusedStream, + St::Item: FusedStream + Unpin, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() && self.inner_streams.is_empty() + } +} + +impl<St> Stream for FlattenUnordered<St> +where + St: Stream, + St::Item: Stream + Unpin, +{ + type Item = <St::Item as Stream>::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut next_item = None; + let mut need_to_poll_next = NONE; + + let mut this = self.as_mut().project(); + + let (mut poll_state_value, state_bomb) = match this.poll_state.start_polling() { + Some(value) => value, + _ => { + // Waker was called, just wait for the next poll + return Poll::Pending; + } + }; + + if poll_state_value & NEED_TO_POLL_STREAM != NONE { + // Safety: now state is `POLLING`. + let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) }; + + // Here we need to poll the base stream. + // + // To improve performance, we will attempt to place as many items as we can + // to the `FuturesUnordered` bucket before polling inner streams + loop { + if this.is_exceeded_limit() || *this.is_stream_done { + // We either exceeded the limit or the stream is exhausted + if !*this.is_stream_done { + // The stream needs to be polled in the next iteration + need_to_poll_next |= NEED_TO_POLL_STREAM; + } + + break; + } else { + match this.stream.as_mut().poll_next(&mut Context::from_waker(&stream_waker)) { + Poll::Ready(Some(inner_stream)) => { + // Add new stream to the inner streams bucket + this.inner_streams.as_mut().push(PollStreamFut::new(inner_stream)); + // Inner streams must be polled afterward + poll_state_value |= NEED_TO_POLL_INNER_STREAMS; + } + Poll::Ready(None) => { + // Mark the stream as done + *this.is_stream_done = true; + } + Poll::Pending => { + break; + } + } + } + } + } + + if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE { + // Safety: now state is `POLLING`. + let inner_streams_waker = + unsafe { InnerWaker::replace_waker(this.inner_streams_waker, cx) }; + + match this + .inner_streams + .as_mut() + .poll_next(&mut Context::from_waker(&inner_streams_waker)) + { + Poll::Ready(Some(Some((item, next_item_fut)))) => { + // Push next inner stream item future to the list of inner streams futures + this.inner_streams.as_mut().push(next_item_fut); + // Take the received item + next_item = Some(item); + // On the next iteration, inner streams must be polled again + need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS; + } + Poll::Ready(Some(None)) => { + // On the next iteration, inner streams must be polled again + need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS; + } + _ => {} + } + } + + // We didn't have any `poll_next` panic, so it's time to deactivate the bomb + state_bomb.deactivate(); + + let mut force_wake = + // we need to poll the stream and didn't reach the limit yet + need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit() + // or we need to poll inner streams again + || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE; + + // Stop polling and swap the latest state + poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake); + // If state was changed during `POLLING` phase, need to manually call a waker + force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE; + + let is_done = *this.is_stream_done && this.inner_streams.is_empty(); + + if next_item.is_some() || is_done { + Poll::Ready(next_item) + } else { + if force_wake { + cx.waker().wake_by_ref(); + } + + Poll::Pending + } + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<St, Item> Sink<Item> for FlattenUnordered<St> +where + St: Stream + Sink<Item>, +{ + type Error = St::Error; + + delegate_sink!(stream, Item); +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 86997f4..642b91e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -40,6 +40,10 @@ mod concat; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::concat::Concat; +mod count; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::count::Count; + mod cycle; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::cycle::Cycle; @@ -195,6 +199,25 @@ pub use self::buffered::Buffered; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] +mod flatten_unordered; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] +pub use self::flatten_unordered::FlattenUnordered; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +delegate_all!( + /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method. + FlatMapUnordered<St, U, F>( + FlattenUnordered<Map<St, F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)] + where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U +); + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] mod for_each_concurrent; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] @@ -386,9 +409,9 @@ pub trait StreamExt: Stream { /// use futures::stream::{self, StreamExt}; /// /// let stream = stream::iter(1..=10); - /// let evens = stream.filter(|x| future::ready(x % 2 == 0)); + /// let events = stream.filter(|x| future::ready(x % 2 == 0)); /// - /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await); + /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await); /// # }); /// ``` fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> @@ -418,11 +441,11 @@ pub trait StreamExt: Stream { /// use futures::stream::{self, StreamExt}; /// /// let stream = stream::iter(1..=10); - /// let evens = stream.filter_map(|x| async move { + /// let events = stream.filter_map(|x| async move { /// if x % 2 == 0 { Some(x + 1) } else { None } /// }); /// - /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await); + /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await); /// # }); /// ``` fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> @@ -576,6 +599,38 @@ pub trait StreamExt: Stream { assert_future::<Self::Item, _>(Concat::new(self)) } + /// Drives the stream to completion, counting the number of items. + /// + /// # Overflow Behavior + /// + /// The method does no guarding against overflows, so counting elements of a + /// stream with more than [`usize::MAX`] elements either produces the wrong + /// result or panics. If debug assertions are enabled, a panic is guaranteed. + /// + /// # Panics + /// + /// This function might panic if the iterator has more than [`usize::MAX`] + /// elements. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..=10); + /// let count = stream.count().await; + /// + /// assert_eq!(count, 10); + /// # }); + /// ``` + fn count(self) -> Count<Self> + where + Self: Sized, + { + assert_future::<usize, _>(Count::new(self)) + } + /// Repeats a stream endlessly. /// /// The stream never terminates. Note that you likely want to avoid @@ -718,13 +773,57 @@ pub trait StreamExt: Stream { assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self)) } + /// Flattens a stream of streams into just one continuous stream. Polls + /// inner streams concurrently. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::StreamExt; + /// use std::thread; + /// + /// let (tx1, rx1) = mpsc::unbounded(); + /// let (tx2, rx2) = mpsc::unbounded(); + /// let (tx3, rx3) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx1.unbounded_send(1).unwrap(); + /// tx1.unbounded_send(2).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx2.unbounded_send(3).unwrap(); + /// tx2.unbounded_send(4).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx3.unbounded_send(rx1).unwrap(); + /// tx3.unbounded_send(rx2).unwrap(); + /// }); + /// + /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await; + /// output.sort(); + /// + /// assert_eq!(output, vec![1, 2, 3, 4]); + /// # }); + /// ``` + #[cfg(not(futures_no_atomic_cas))] + #[cfg(feature = "alloc")] + fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self> + where + Self::Item: Stream + Unpin, + Self: Sized, + { + FlattenUnordered::new(self, limit.into()) + } + /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. /// /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead, /// you would have to chain combinators like `.map(f).flatten()` while this /// combinator provides ability to write `.flat_map(f)` instead of chaining. /// - /// The provided closure which produce inner streams is executed over all elements + /// The provided closure which produces inner streams is executed over all elements /// of stream as last inner stream is terminated and next stream item is available. /// /// Note that this function consumes the stream passed into it and returns a @@ -752,6 +851,59 @@ pub trait StreamExt: Stream { assert_stream::<U::Item, _>(FlatMap::new(self, f)) } + /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s + /// and polls them concurrently, yielding items in any order, as they made + /// available. + /// + /// [`StreamExt::map`] is very useful, but if it produces `Stream`s + /// instead, and you need to poll all of them concurrently, you would + /// have to use something like `for_each_concurrent` and merge values + /// by hand. This combinator provides ability to collect all values + /// from concurrently polled streams into one stream. + /// + /// The first argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled concurrently. The `limit` argument is of type + /// `Into<Option<usize>>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. + /// + /// The provided closure which produces inner streams is executed over + /// all elements of stream as next stream item is available and limit + /// of concurrently processed streams isn't exceeded. + /// + /// Note that this function consumes the stream passed into it and + /// returns a wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..5); + /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x])); + /// let mut values = stream.collect::<Vec<_>>().await; + /// values.sort(); + /// + /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values); + /// # }); + /// ``` + #[cfg(not(futures_no_atomic_cas))] + #[cfg(feature = "alloc")] + fn flat_map_unordered<U, F>( + self, + limit: impl Into<Option<usize>>, + f: F, + ) -> FlatMapUnordered<Self, U, F> + where + U: Stream + Unpin, + F: FnMut(Self::Item) -> U, + Self: Sized, + { + FlatMapUnordered::new(self, limit.into(), f) + } + /// Combinator similar to [`StreamExt::fold`] that holds internal state /// and produces a new stream. /// diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index 8724145..f5cfde9 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -118,11 +118,11 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl<S, Fut, F, Item> Sink<Item> for Scan<S, S, Fut, F> +impl<St, S, Fut, F, Item> Sink<Item> for Scan<St, S, Fut, F> where - S: Stream + Sink<Item>, + St: Stream + Sink<Item>, { - type Error = S::Error; + type Error = St::Error; delegate_sink!(stream, Item); } diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index 455ddca..6bf2cb7 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -736,17 +736,21 @@ pub trait TryStreamExt: TryStream { /// thread::spawn(move || { /// tx2.unbounded_send(Ok(2)).unwrap(); /// tx2.unbounded_send(Err(3)).unwrap(); + /// tx2.unbounded_send(Ok(4)).unwrap(); /// }); /// thread::spawn(move || { /// tx3.unbounded_send(Ok(rx1)).unwrap(); /// tx3.unbounded_send(Ok(rx2)).unwrap(); - /// tx3.unbounded_send(Err(4)).unwrap(); + /// tx3.unbounded_send(Err(5)).unwrap(); /// }); /// /// let mut stream = rx3.try_flatten(); /// assert_eq!(stream.next().await, Some(Ok(1))); /// assert_eq!(stream.next().await, Some(Ok(2))); /// assert_eq!(stream.next().await, Some(Err(3))); + /// assert_eq!(stream.next().await, Some(Ok(4))); + /// assert_eq!(stream.next().await, Some(Err(5))); + /// assert_eq!(stream.next().await, None); /// # }); /// ``` fn try_flatten(self) -> TryFlatten<Self> @@ -1001,6 +1005,7 @@ pub trait TryStreamExt: TryStream { /// Wraps a [`TryStream`] into a stream compatible with libraries using /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled. /// ``` + /// # if cfg!(miri) { return; } // Miri does not support epoll /// use futures::future::{FutureExt, TryFutureExt}; /// # let (tx, rx) = futures::channel::oneshot::channel(); /// diff --git a/src/task/mod.rs b/src/task/mod.rs index eff6d48..0a31eea 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -16,7 +16,6 @@ pub use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; pub use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, UnsafeFutureObj}; pub use futures_task::noop_waker; -#[cfg(feature = "std")] pub use futures_task::noop_waker_ref; #[cfg(not(futures_no_atomic_cas))] diff --git a/src/task/spawn.rs b/src/task/spawn.rs index f877923..87ca360 100644 --- a/src/task/spawn.rs +++ b/src/task/spawn.rs @@ -34,6 +34,7 @@ pub trait SpawnExt: Spawn { /// today. Feel free to use this method in the meantime. /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 /// use futures::executor::ThreadPool; /// use futures::task::SpawnExt; /// @@ -58,6 +59,7 @@ pub trait SpawnExt: Spawn { /// resolves to the output of the spawned future. /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 /// use futures::executor::{block_on, ThreadPool}; /// use futures::future; /// use futures::task::SpawnExt; @@ -136,6 +138,7 @@ pub trait LocalSpawnExt: LocalSpawn { /// resolves to the output of the spawned future. /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 /// use futures::executor::LocalPool; /// use futures::task::LocalSpawnExt; /// |