diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-03-06 00:05:18 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-03-06 00:05:18 +0000 |
commit | 6498cbaeda6441af022b6a13cc59719de0c4b18e (patch) | |
tree | e2532a639f92776d56f172cf560f5ba2f62642bc | |
parent | b56581940e3a82dd68fcce583aab850e459a3f8b (diff) | |
parent | 3049d36a1d1b1613a52f86adf1ea8408e5e3c837 (diff) | |
download | futures-android-vts-13.0_r7.tar.gz |
Snap for 8261343 from 3049d36a1d1b1613a52f86adf1ea8408e5e3c837 to tm-releaseandroid-vts-13.0_r8android-vts-13.0_r7android-vts-13.0_r6android-vts-13.0_r5android-vts-13.0_r4android-vts-13.0_r3android-vts-13.0_r2android-vts-13.0_r1android-security-13.0.0_r9android-security-13.0.0_r8android-security-13.0.0_r7android-security-13.0.0_r6android-security-13.0.0_r5android-security-13.0.0_r4android-security-13.0.0_r3android-security-13.0.0_r2android-security-13.0.0_r19android-security-13.0.0_r18android-security-13.0.0_r17android-security-13.0.0_r16android-security-13.0.0_r15android-security-13.0.0_r14android-security-13.0.0_r13android-security-13.0.0_r12android-security-13.0.0_r11android-security-13.0.0_r10android-security-13.0.0_r1android-platform-13.0.0_r9android-platform-13.0.0_r8android-platform-13.0.0_r7android-platform-13.0.0_r6android-platform-13.0.0_r5android-platform-13.0.0_r4android-platform-13.0.0_r3android-platform-13.0.0_r21android-platform-13.0.0_r20android-platform-13.0.0_r2android-platform-13.0.0_r19android-platform-13.0.0_r18android-platform-13.0.0_r17android-platform-13.0.0_r16android-platform-13.0.0_r15android-platform-13.0.0_r14android-platform-13.0.0_r13android-platform-13.0.0_r12android-platform-13.0.0_r11android-platform-13.0.0_r10android-platform-13.0.0_r1android-cts-13.0_r8android-cts-13.0_r7android-cts-13.0_r6android-cts-13.0_r5android-cts-13.0_r4android-cts-13.0_r3android-cts-13.0_r2android-cts-13.0_r1android-13.0.0_r8android-13.0.0_r7android-13.0.0_r6android-13.0.0_r5android-13.0.0_r4android-13.0.0_r31android-13.0.0_r3android-13.0.0_r2android-13.0.0_r12android-13.0.0_r1android13-tests-releaseandroid13-security-releaseandroid13-s3-releaseandroid13-s2-releaseandroid13-s1-releaseandroid13-releaseandroid13-platform-releaseandroid13-gsi
Change-Id: Ic3b4bf3de2d83d645ebd056e382e2e35ce3a5754
-rw-r--r-- | .cargo_vcs_info.json | 7 | ||||
-rw-r--r-- | Android.bp | 2 | ||||
-rw-r--r-- | Cargo.toml | 106 | ||||
-rw-r--r-- | Cargo.toml.orig | 20 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | src/lib.rs | 76 | ||||
-rw-r--r-- | tests/compat.rs | 1 | ||||
-rw-r--r-- | tests/eventual.rs | 2 | ||||
-rw-r--r-- | tests/future_join_all.rs | 25 | ||||
-rw-r--r-- | tests/future_shared.rs | 1 | ||||
-rw-r--r-- | tests/future_try_join_all.rs | 24 | ||||
-rw-r--r-- | tests/io_line_writer.rs | 73 | ||||
-rw-r--r-- | tests/lock_mutex.rs | 1 | ||||
-rw-r--r-- | tests/macro_comma_support.rs | 1 | ||||
-rw-r--r-- | tests/ready_queue.rs | 3 | ||||
-rw-r--r-- | tests/recurse.rs | 1 | ||||
-rw-r--r-- | tests/sink.rs | 1 | ||||
-rw-r--r-- | tests/stream.rs | 272 | ||||
-rw-r--r-- | tests/stream_futures_ordered.rs | 2 | ||||
-rw-r--r-- | tests/stream_futures_unordered.rs | 2 | ||||
-rw-r--r-- | tests/stream_try_stream.rs | 2 | ||||
-rw-r--r-- | tests/task_atomic_waker.rs | 1 |
22 files changed, 556 insertions, 77 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index ffd4f55..e483977 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,6 @@ { "git": { - "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010" - } -} + "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44" + }, + "path_in_vcs": "futures" +}
\ No newline at end of file @@ -42,7 +42,7 @@ rust_library { host_supported: true, crate_name: "futures", cargo_env_compat: true, - cargo_pkg_version: "0.3.17", + cargo_pkg_version: "0.3.21", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -11,53 +11,72 @@ [package] edition = "2018" +rust-version = "1.45" name = "futures" -version = "0.3.17" -authors = ["Alex Crichton <alex@alexcrichton.com>"] -description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n" +version = "0.3.21" +description = """ +An implementation of futures and streams featuring zero allocations, +composability, and iterator-like interfaces. +""" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures/0.3" readme = "../README.md" -keywords = ["futures", "async", "future"] +keywords = [ + "futures", + "async", + "future", +] categories = ["asynchronous"] license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" + [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = [ + "--cfg", + "docsrs", +] [package.metadata.playground] -features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"] +features = [ + "std", + "async-await", + "compat", + "io-compat", + "executor", + "thread-pool", +] + [dependencies.futures-channel] -version = "0.3.17" +version = "0.3.21" features = ["sink"] default-features = false [dependencies.futures-core] -version = "0.3.17" +version = "0.3.21" default-features = false [dependencies.futures-executor] -version = "0.3.17" +version = "0.3.21" optional = true default-features = false [dependencies.futures-io] -version = "0.3.17" +version = "0.3.21" default-features = false [dependencies.futures-sink] -version = "0.3.17" +version = "0.3.21" default-features = false [dependencies.futures-task] -version = "0.3.17" +version = "0.3.21" default-features = false [dependencies.futures-util] -version = "0.3.17" +version = "0.3.21" features = ["sink"] default-features = false + [dev-dependencies.assert_matches] version = "1.3.0" @@ -74,16 +93,55 @@ version = "1" version = "0.1.11" [features] -alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"] -async-await = ["futures-util/async-await", "futures-util/async-await-macro"] +alloc = [ + "futures-core/alloc", + "futures-task/alloc", + "futures-sink/alloc", + "futures-channel/alloc", + "futures-util/alloc", +] +async-await = [ + "futures-util/async-await", + "futures-util/async-await-macro", +] bilock = ["futures-util/bilock"] cfg-target-has-atomic = [] -compat = ["std", "futures-util/compat"] -default = ["std", "async-await", "executor"] -executor = ["std", "futures-executor/std"] -io-compat = ["compat", "futures-util/io-compat"] -read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"] -std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"] -thread-pool = ["executor", "futures-executor/thread-pool"] -unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"] +compat = [ + "std", + "futures-util/compat", +] +default = [ + "std", + "async-await", + "executor", +] +executor = [ + "std", + "futures-executor/std", +] +io-compat = [ + "compat", + "futures-util/io-compat", +] +std = [ + "alloc", + "futures-core/std", + "futures-task/std", + "futures-io/std", + "futures-sink/std", + "futures-util/std", + "futures-util/io", + "futures-util/channel", +] +thread-pool = [ + "executor", + "futures-executor/thread-pool", +] +unstable = [ + "futures-core/unstable", + "futures-task/unstable", + "futures-channel/unstable", + "futures-io/unstable", + "futures-util/unstable", +] write-all-vectored = ["futures-util/write-all-vectored"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index b01b12e..6871f47 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,14 +1,13 @@ [package] name = "futures" +version = "0.3.21" edition = "2018" -version = "0.3.17" -authors = ["Alex Crichton <alex@alexcrichton.com>"] +rust-version = "1.45" license = "MIT OR Apache-2.0" readme = "../README.md" keywords = ["futures", "async", "future"] repository = "https://github.com/rust-lang/futures-rs" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures/0.3" description = """ An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces. @@ -16,13 +15,13 @@ composability, and iterator-like interfaces. categories = ["asynchronous"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.17", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.17", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["sink"] } -futures-executor = { path = "../futures-executor", version = "0.3.17", default-features = false, optional = true } -futures-io = { path = "../futures-io", version = "0.3.17", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.17", default-features = false, features = ["sink"] } +futures-core = { path = "../futures-core", version = "0.3.21", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.21", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["sink"] } +futures-executor = { path = "../futures-executor", version = "0.3.21", default-features = false, optional = true } +futures-io = { path = "../futures-io", version = "0.3.21", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.21", default-features = false, features = ["sink"] } [dev-dependencies] futures-executor = { path = "../futures-executor", features = ["thread-pool"] } @@ -48,7 +47,6 @@ thread-pool = ["executor", "futures-executor/thread-pool"] # `unstable` feature as an explicit opt-in to unstable API. unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"] bilock = ["futures-util/bilock"] -read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"] write-all-vectored = ["futures-util/write-all-vectored"] # These features are no longer used. @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures/futures-0.3.17.crate" + value: "https://static.crates.io/crates/futures/futures-0.3.21.crate" } - version: "0.3.17" + version: "0.3.21" license_type: NOTICE last_upgrade_date { - year: 2021 - month: 9 - day: 22 + year: 2022 + month: 3 + day: 1 } } @@ -25,6 +25,7 @@ //! within macros and keywords such as async and await!. //! //! ```rust +//! # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038 //! # use futures::channel::mpsc; //! # use futures::executor; ///standard executors to provide a context for futures and streams //! # use futures::executor::ThreadPool; @@ -78,7 +79,6 @@ //! The majority of examples and code snippets in this crate assume that they are //! inside an async block as written above. -#![cfg_attr(feature = "read-initializer", feature(read_initializer))] #![cfg_attr(not(feature = "std"), no_std)] #![warn( missing_debug_implementations, @@ -99,9 +99,6 @@ #[cfg(all(feature = "bilock", not(feature = "unstable")))] compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); -#[cfg(all(feature = "read-initializer", not(feature = "unstable")))] -compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[doc(no_inline)] pub use futures_core::future::{Future, TryFuture}; #[doc(no_inline)] @@ -154,13 +151,73 @@ pub use futures_util::io; #[cfg(feature = "executor")] #[cfg_attr(docsrs, doc(cfg(feature = "executor")))] -#[doc(inline)] -pub use futures_executor as executor; +pub mod executor { + //! Built-in executors and related tools. + //! + //! All asynchronous computation occurs within an executor, which is + //! capable of spawning futures as tasks. This module provides several + //! built-in executors, as well as tools for building your own. + //! + //! + //! This module is only available when the `executor` feature of this + //! library is activated. + //! + //! # Using a thread pool (M:N task scheduling) + //! + //! Most of the time tasks should be executed on a [thread pool](ThreadPool). + //! A small set of worker threads can handle a very large set of spawned tasks + //! (which are much lighter weight than threads). Tasks spawned onto the pool + //! with the [`spawn_ok`](ThreadPool::spawn_ok) function will run ambiently on + //! the created threads. + //! + //! # Spawning additional tasks + //! + //! Tasks can be spawned onto a spawner by calling its [`spawn_obj`] method + //! directly. In the case of `!Send` futures, [`spawn_local_obj`] can be used + //! instead. + //! + //! # Single-threaded execution + //! + //! In addition to thread pools, it's possible to run a task (and the tasks + //! it spawns) entirely within a single thread via the [`LocalPool`] executor. + //! Aside from cutting down on synchronization costs, this executor also makes + //! it possible to spawn non-`Send` tasks, via [`spawn_local_obj`]. The + //! [`LocalPool`] is best suited for running I/O-bound tasks that do relatively + //! little work between I/O operations. + //! + //! There is also a convenience function [`block_on`] for simply running a + //! future to completion on the current thread. + //! + //! [`spawn_obj`]: https://docs.rs/futures/0.3/futures/task/trait.Spawn.html#tymethod.spawn_obj + //! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj + + pub use futures_executor::{ + block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool, + LocalSpawner, + }; + + #[cfg(feature = "thread-pool")] + #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] + pub use futures_executor::{ThreadPool, ThreadPoolBuilder}; +} #[cfg(feature = "compat")] #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] -#[doc(inline)] -pub use futures_util::compat; +pub mod compat { + //! Interop between `futures` 0.1 and 0.3. + //! + //! This module is only available when the `compat` feature of this + //! library is activated. + + pub use futures_util::compat::{ + Compat, Compat01As03, Compat01As03Sink, CompatSink, Executor01As03, Executor01CompatExt, + Executor01Future, Future01CompatExt, Sink01CompatExt, Stream01CompatExt, + }; + + #[cfg(feature = "io-compat")] + #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] + pub use futures_util::compat::{AsyncRead01CompatExt, AsyncWrite01CompatExt}; +} pub mod prelude { //! A "prelude" for crates using the `futures` crate. @@ -181,10 +238,12 @@ pub mod prelude { pub use crate::stream::{self, Stream, TryStream}; #[doc(no_inline)] + #[allow(unreachable_pub)] pub use crate::future::{FutureExt as _, TryFutureExt as _}; #[doc(no_inline)] pub use crate::sink::SinkExt as _; #[doc(no_inline)] + #[allow(unreachable_pub)] pub use crate::stream::{StreamExt as _, TryStreamExt as _}; #[cfg(feature = "std")] @@ -192,6 +251,7 @@ pub mod prelude { #[cfg(feature = "std")] #[doc(no_inline)] + #[allow(unreachable_pub)] pub use crate::io::{ AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _, }; diff --git a/tests/compat.rs b/tests/compat.rs index c4125d8..ac04a95 100644 --- a/tests/compat.rs +++ b/tests/compat.rs @@ -1,4 +1,5 @@ #![cfg(feature = "compat")] +#![cfg(not(miri))] // Miri does not support epoll use futures::compat::Future01CompatExt; use futures::prelude::*; diff --git a/tests/eventual.rs b/tests/eventual.rs index bff000d..3461380 100644 --- a/tests/eventual.rs +++ b/tests/eventual.rs @@ -1,3 +1,5 @@ +#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038 + use futures::channel::oneshot; use futures::executor::ThreadPool; use futures::future::{self, ok, Future, FutureExt, TryFutureExt}; diff --git a/tests/future_join_all.rs b/tests/future_join_all.rs index ae05a21..44486e1 100644 --- a/tests/future_join_all.rs +++ b/tests/future_join_all.rs @@ -1,22 +1,24 @@ use futures::executor::block_on; use futures::future::{join_all, ready, Future, JoinAll}; +use futures::pin_mut; use std::fmt::Debug; -fn assert_done<T, F>(actual_fut: F, expected: T) +#[track_caller] +fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T) where T: PartialEq + Debug, - F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>, { - let output = block_on(actual_fut()); + pin_mut!(actual_fut); + let output = block_on(actual_fut); assert_eq!(output, expected); } #[test] fn collect_collects() { - assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]); - assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]); + assert_done(join_all(vec![ready(1), ready(2)]), vec![1, 2]); + assert_done(join_all(vec![ready(1)]), vec![1]); // REVIEW: should this be implemented? - // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]); + // assert_done(join_all(Vec::<i32>::new()), vec![]); // TODO: needs more tests } @@ -25,18 +27,15 @@ fn collect_collects() { fn join_all_iter_lifetime() { // In futures-rs version 0.1, this function would fail to typecheck due to an overly // conservative type parameterization of `JoinAll`. - fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> { + fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Vec<usize>> { let iter = bufs.into_iter().map(|b| ready::<usize>(b.len())); - Box::new(join_all(iter)) + join_all(iter) } - assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]); + assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]); } #[test] fn join_all_from_iter() { - assert_done( - || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()), - vec![1, 2], - ) + assert_done(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>(), vec![1, 2]) } diff --git a/tests/future_shared.rs b/tests/future_shared.rs index 718d6c4..3ceaebb 100644 --- a/tests/future_shared.rs +++ b/tests/future_shared.rs @@ -96,6 +96,7 @@ fn drop_in_poll() { assert_eq!(block_on(future1), 1); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn peek() { let mut local_pool = LocalPool::new(); diff --git a/tests/future_try_join_all.rs b/tests/future_try_join_all.rs index a4b3bb7..9a82487 100644 --- a/tests/future_try_join_all.rs +++ b/tests/future_try_join_all.rs @@ -1,24 +1,26 @@ use futures::executor::block_on; +use futures::pin_mut; use futures_util::future::{err, ok, try_join_all, TryJoinAll}; use std::fmt::Debug; use std::future::Future; -fn assert_done<T, F>(actual_fut: F, expected: T) +#[track_caller] +fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T) where T: PartialEq + Debug, - F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>, { - let output = block_on(actual_fut()); + pin_mut!(actual_fut); + let output = block_on(actual_fut); assert_eq!(output, expected); } #[test] fn collect_collects() { - assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2])); - assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2)); - assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1])); + assert_done(try_join_all(vec![ok(1), ok(2)]), Ok::<_, usize>(vec![1, 2])); + assert_done(try_join_all(vec![ok(1), err(2)]), Err(2)); + assert_done(try_join_all(vec![ok(1)]), Ok::<_, usize>(vec![1])); // REVIEW: should this be implemented? - // assert_done(|| Box::new(try_join_all(Vec::<i32>::new())), Ok(vec![])); + // assert_done(try_join_all(Vec::<i32>::new()), Ok(vec![])); // TODO: needs more tests } @@ -27,18 +29,18 @@ fn collect_collects() { fn try_join_all_iter_lifetime() { // In futures-rs version 0.1, this function would fail to typecheck due to an overly // conservative type parameterization of `TryJoinAll`. - fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> { + fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Result<Vec<usize>, ()>> { let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len())); - Box::new(try_join_all(iter)) + try_join_all(iter) } - assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1])); + assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1])); } #[test] fn try_join_all_from_iter() { assert_done( - || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()), + vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>(), Ok::<_, usize>(vec![1, 2]), ) } diff --git a/tests/io_line_writer.rs b/tests/io_line_writer.rs new file mode 100644 index 0000000..b483e0f --- /dev/null +++ b/tests/io_line_writer.rs @@ -0,0 +1,73 @@ +use futures::executor::block_on; +use futures::io::{AsyncWriteExt, LineWriter}; +use std::io; + +#[test] +fn line_writer() { + let mut writer = LineWriter::new(Vec::new()); + + block_on(writer.write(&[0])).unwrap(); + assert_eq!(*writer.get_ref(), []); + + block_on(writer.write(&[1])).unwrap(); + assert_eq!(*writer.get_ref(), []); + + block_on(writer.flush()).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1]); + + block_on(writer.write(&[0, b'\n', 1, b'\n', 2])).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']); + + block_on(writer.flush()).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]); + + block_on(writer.write(&[3, b'\n'])).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']); +} + +#[test] +fn line_vectored() { + let mut line_writer = LineWriter::new(Vec::new()); + assert_eq!( + block_on(line_writer.write_vectored(&[ + io::IoSlice::new(&[]), + io::IoSlice::new(b"\n"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"a"), + ])) + .unwrap(), + 2 + ); + assert_eq!(line_writer.get_ref(), b"\n"); + + assert_eq!( + block_on(line_writer.write_vectored(&[ + io::IoSlice::new(&[]), + io::IoSlice::new(b"b"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"a"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"c"), + ])) + .unwrap(), + 3 + ); + assert_eq!(line_writer.get_ref(), b"\n"); + block_on(line_writer.flush()).unwrap(); + assert_eq!(line_writer.get_ref(), b"\nabac"); + assert_eq!(block_on(line_writer.write_vectored(&[])).unwrap(), 0); + + assert_eq!( + block_on(line_writer.write_vectored(&[ + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + ])) + .unwrap(), + 0 + ); + + assert_eq!(block_on(line_writer.write_vectored(&[io::IoSlice::new(b"a\nb")])).unwrap(), 3); + assert_eq!(line_writer.get_ref(), b"\nabaca\nb"); +} diff --git a/tests/lock_mutex.rs b/tests/lock_mutex.rs index 7c33864..c92ef50 100644 --- a/tests/lock_mutex.rs +++ b/tests/lock_mutex.rs @@ -34,6 +34,7 @@ fn mutex_wakes_waiters() { assert!(waiter.poll_unpin(&mut panic_context()).is_ready()); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn mutex_contested() { let (tx, mut rx) = mpsc::unbounded(); diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs index 85871e9..3b082d2 100644 --- a/tests/macro_comma_support.rs +++ b/tests/macro_comma_support.rs @@ -14,6 +14,7 @@ fn ready() { })) } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn poll() { use futures::poll; diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs index 8290132..afba8f2 100644 --- a/tests/ready_queue.rs +++ b/tests/ready_queue.rs @@ -93,6 +93,9 @@ fn dropping_ready_queue() { #[test] fn stress() { + #[cfg(miri)] + const ITER: usize = 30; + #[cfg(not(miri))] const ITER: usize = 300; for i in 0..ITER { diff --git a/tests/recurse.rs b/tests/recurse.rs index d81753c..f06524f 100644 --- a/tests/recurse.rs +++ b/tests/recurse.rs @@ -3,6 +3,7 @@ use futures::future::{self, BoxFuture, FutureExt}; use std::sync::mpsc; use std::thread; +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn lots() { #[cfg(not(futures_sanitizer))] diff --git a/tests/sink.rs b/tests/sink.rs index f3cf11b..dc826bd 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -288,6 +288,7 @@ fn mpsc_blocking_start_send() { // test `flush` by using `with` to make the first insertion into a sink block // until a oneshot is completed +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn with_flush() { let (tx, rx) = oneshot::channel(); diff --git a/tests/stream.rs b/tests/stream.rs index 0d453d1..71ec654 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -1,10 +1,14 @@ +use std::iter; +use std::sync::Arc; + use futures::channel::mpsc; use futures::executor::block_on; use futures::future::{self, Future}; +use futures::lock::Mutex; use futures::sink::SinkExt; use futures::stream::{self, StreamExt}; use futures::task::Poll; -use futures::FutureExt; +use futures::{ready, FutureExt}; use futures_test::task::noop_context; #[test] @@ -50,6 +54,272 @@ fn scan() { } #[test] +fn flatten_unordered() { + use futures::executor::block_on; + use futures::stream::*; + use futures::task::*; + use std::convert::identity; + use std::pin::Pin; + use std::thread; + use std::time::Duration; + + struct DataStream { + data: Vec<u8>, + polled: bool, + wake_immediately: bool, + } + + impl Stream for DataStream { + type Item = u8; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { + if !self.polled { + if !self.wake_immediately { + let waker = ctx.waker().clone(); + let sleep_time = + Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10); + thread::spawn(move || { + thread::sleep(sleep_time); + waker.wake_by_ref(); + }); + } else { + ctx.waker().wake_by_ref(); + } + self.polled = true; + Poll::Pending + } else { + self.polled = false; + Poll::Ready(self.data.pop()) + } + } + } + + struct Interchanger { + polled: bool, + base: u8, + wake_immediately: bool, + } + + impl Stream for Interchanger { + type Item = DataStream; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { + if !self.polled { + self.polled = true; + if !self.wake_immediately { + let waker = ctx.waker().clone(); + let sleep_time = Duration::from_millis(self.base as u64); + thread::spawn(move || { + thread::sleep(sleep_time); + waker.wake_by_ref(); + }); + } else { + ctx.waker().wake_by_ref(); + } + Poll::Pending + } else { + let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect(); + self.base += 1; + self.polled = false; + Poll::Ready(Some(DataStream { + polled: false, + data, + wake_immediately: self.wake_immediately && self.base % 2 == 0, + })) + } + } + } + + // basic behaviour + { + block_on(async { + let st = stream::iter(vec![ + stream::iter(0..=4u8), + stream::iter(6..=10), + stream::iter(10..=12), + ]); + + let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await; + + assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]); + }); + + block_on(async { + let st = stream::iter(vec![ + stream::iter(0..=4u8), + stream::iter(6..=10), + stream::iter(0..=2), + ]); + + let mut fm_unordered = st + .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0))) + .collect::<Vec<_>>() + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]); + }); + } + + // wake up immediately + { + block_on(async { + let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + .await; + + fl_unordered.sort_unstable(); + + assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) + .collect::<Vec<_>>() + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + } + + // wake up after delay + { + block_on(async { + let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + .await; + + fl_unordered.sort_unstable(); + + assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) + .collect::<Vec<_>>() + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let (mut fm_unordered, mut fl_unordered) = futures_util::join!( + Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) + .collect::<Vec<_>>(), + Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + ); + + fm_unordered.sort_unstable(); + fl_unordered.sort_unstable(); + + assert_eq!(fm_unordered, fl_unordered); + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + } + + // waker panics + { + let stream = Arc::new(Mutex::new( + Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)), + )); + + struct PanicWaker; + + impl ArcWake for PanicWaker { + fn wake_by_ref(_arc_self: &Arc<Self>) { + panic!("WAKE UP"); + } + } + + std::thread::spawn({ + let stream = stream.clone(); + move || { + let mut st = poll_fn(|cx| { + let mut lock = ready!(stream.lock().poll_unpin(cx)); + + let panic_waker = waker(Arc::new(PanicWaker)); + let mut panic_cx = Context::from_waker(&panic_waker); + let _ = ready!(lock.poll_next_unpin(&mut panic_cx)); + + Poll::Ready(Some(())) + }); + + block_on(st.next()) + } + }) + .join() + .unwrap_err(); + + block_on(async move { + let mut values: Vec<_> = stream.lock().await.by_ref().collect().await; + values.sort_unstable(); + + assert_eq!(values, (0..60).collect::<Vec<u8>>()); + }); + } + + // stream panics + { + let st = stream::iter(iter::once( + once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(), + )) + .chain( + Interchanger { polled: false, base: 0, wake_immediately: true } + .map(|stream| stream.right_stream()) + .take(10), + ); + + let stream = Arc::new(Mutex::new(st.flatten_unordered(10))); + + std::thread::spawn({ + let stream = stream.clone(); + move || { + let mut st = poll_fn(|cx| { + let mut lock = ready!(stream.lock().poll_unpin(cx)); + let data = ready!(lock.poll_next_unpin(cx)); + + Poll::Ready(data) + }); + + block_on(st.next()) + } + }) + .join() + .unwrap_err(); + + block_on(async move { + let mut values: Vec<_> = stream.lock().await.by_ref().collect().await; + values.sort_unstable(); + + assert_eq!(values, (0..60).collect::<Vec<u8>>()); + }); + } +} + +#[test] fn take_until() { fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> { let mut i = 0; diff --git a/tests/stream_futures_ordered.rs b/tests/stream_futures_ordered.rs index 7506c65..84e0bcc 100644 --- a/tests/stream_futures_ordered.rs +++ b/tests/stream_futures_ordered.rs @@ -26,6 +26,7 @@ fn works_1() { assert_eq!(None, iter.next()); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn works_2() { let (a_tx, a_rx) = oneshot::channel::<i32>(); @@ -54,6 +55,7 @@ fn from_iterator() { assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn queue_never_unblocked() { let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>(); diff --git a/tests/stream_futures_unordered.rs b/tests/stream_futures_unordered.rs index 4b9afcc..f62f733 100644 --- a/tests/stream_futures_unordered.rs +++ b/tests/stream_futures_unordered.rs @@ -56,6 +56,7 @@ fn works_1() { assert_eq!(None, iter.next()); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn works_2() { let (a_tx, a_rx) = oneshot::channel::<i32>(); @@ -85,6 +86,7 @@ fn from_iterator() { assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]); } +#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038 #[test] fn finished_future() { let (_a_tx, a_rx) = oneshot::channel::<i32>(); diff --git a/tests/stream_try_stream.rs b/tests/stream_try_stream.rs index 194e74d..d83fc54 100644 --- a/tests/stream_try_stream.rs +++ b/tests/stream_try_stream.rs @@ -1,3 +1,5 @@ +#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038 + use futures::{ stream::{self, StreamExt, TryStreamExt}, task::Poll, diff --git a/tests/task_atomic_waker.rs b/tests/task_atomic_waker.rs index cec3db2..2d1612a 100644 --- a/tests/task_atomic_waker.rs +++ b/tests/task_atomic_waker.rs @@ -6,6 +6,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread; +#[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn basic() { let atomic_waker = Arc::new(AtomicWaker::new()); |