aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2023-07-07 01:02:56 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2023-07-07 01:02:56 +0000
commit704d500bda9656a5fa63d1bd28f7b9f5cea7cc79 (patch)
tree96de8e195ba005a8d64963e5017b1fb8aa552d5a
parentf9836ad75ee0e253f59bed0df3e95b1ac5525f73 (diff)
parent34fcaa0dc06597fff16219d2b32fd17dbf4f6c51 (diff)
downloadfutures-android14-mainline-cellbroadcast-release.tar.gz
Change-Id: I0c41bea41c9a62b63564783345860bd759abee19
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp6
-rw-r--r--Cargo.toml20
-rw-r--r--Cargo.toml.orig18
-rw-r--r--METADATA14
-rw-r--r--README.md61
-rw-r--r--TEST_MAPPING26
-rw-r--r--cargo2android.json2
-rw-r--r--src/lib.rs4
-rw-r--r--tests/async_await_macros.rs12
-rw-r--r--tests/auto_traits.rs8
-rw-r--r--tests/eventual.rs24
-rw-r--r--tests/future_join.rs32
-rw-r--r--tests/future_shared.rs79
-rw-r--r--tests/lock_mutex.rs50
-rw-r--r--tests/macro_comma_support.rs1
-rw-r--r--tests/ready_queue.rs5
-rw-r--r--tests/recurse.rs1
-rw-r--r--tests/sink.rs3
-rw-r--r--tests/stream.rs42
-rw-r--r--tests/stream_futures_ordered.rs90
-rw-r--r--tests/stream_futures_unordered.rs16
-rw-r--r--tests/stream_try_stream.rs2
-rw-r--r--tests/task_atomic_waker.rs1
-rw-r--r--tests_disabled/stream.rs1
25 files changed, 424 insertions, 96 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index e483977..c5e5d7e 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+ "sha1": "5e3693a350f96244151081d2c030208cd15f9572"
},
"path_in_vcs": "futures"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index a594b53..2a41329 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@ rust_library {
host_supported: true,
crate_name: "futures",
cargo_env_compat: true,
- cargo_pkg_version: "0.3.21",
+ cargo_pkg_version: "0.3.26",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -64,9 +64,11 @@ rust_library {
],
apex_available: [
"//apex_available:platform",
- "com.android.bluetooth",
+ "com.android.btservices",
"com.android.resolv",
"com.android.virt",
],
+ product_available: true,
+ vendor_available: true,
min_sdk_version: "29",
}
diff --git a/Cargo.toml b/Cargo.toml
index f740f96..51e052a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,13 +13,13 @@
edition = "2018"
rust-version = "1.45"
name = "futures"
-version = "0.3.21"
+version = "0.3.26"
description = """
An implementation of futures and streams featuring zero allocations,
composability, and iterator-like interfaces.
"""
homepage = "https://rust-lang.github.io/futures-rs"
-readme = "../README.md"
+readme = "README.md"
keywords = [
"futures",
"async",
@@ -47,33 +47,33 @@ features = [
]
[dependencies.futures-channel]
-version = "0.3.21"
+version = "0.3.26"
features = ["sink"]
default-features = false
[dependencies.futures-core]
-version = "0.3.21"
+version = "0.3.26"
default-features = false
[dependencies.futures-executor]
-version = "0.3.21"
+version = "0.3.26"
optional = true
default-features = false
[dependencies.futures-io]
-version = "0.3.21"
+version = "0.3.26"
default-features = false
[dependencies.futures-sink]
-version = "0.3.21"
+version = "0.3.26"
default-features = false
[dependencies.futures-task]
-version = "0.3.21"
+version = "0.3.26"
default-features = false
[dependencies.futures-util]
-version = "0.3.21"
+version = "0.3.26"
features = ["sink"]
default-features = false
@@ -81,7 +81,7 @@ default-features = false
version = "1.3.0"
[dev-dependencies.pin-project]
-version = "1.0.1"
+version = "1.0.11"
[dev-dependencies.pin-utils]
version = "0.1.0"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 6871f47..e7a5f38 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,6 +1,6 @@
[package]
name = "futures"
-version = "0.3.21"
+version = "0.3.26"
edition = "2018"
rust-version = "1.45"
license = "MIT OR Apache-2.0"
@@ -15,19 +15,19 @@ composability, and iterator-like interfaces.
categories = ["asynchronous"]
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.21", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["sink"] }
-futures-executor = { path = "../futures-executor", version = "0.3.21", default-features = false, optional = true }
-futures-io = { path = "../futures-io", version = "0.3.21", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.21", default-features = false, features = ["sink"] }
+futures-core = { path = "../futures-core", version = "0.3.26", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.26", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.26", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.26", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.26", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.26", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.26", default-features = false, features = ["sink"] }
[dev-dependencies]
futures-executor = { path = "../futures-executor", features = ["thread-pool"] }
futures-test = { path = "../futures-test" }
assert_matches = "1.3.0"
-pin-project = "1.0.1"
+pin-project = "1.0.11"
pin-utils = "0.1.0"
static_assertions = "1"
tokio = "0.1.11"
diff --git a/METADATA b/METADATA
index 554f4df..240fa20 100644
--- a/METADATA
+++ b/METADATA
@@ -1,3 +1,7 @@
+# This project was upgraded with external_updater.
+# Usage: tools/external_updater/updater.sh update rust/crates/futures
+# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+
name: "futures"
description: "An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces."
third_party {
@@ -7,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures/futures-0.3.21.crate"
+ value: "https://static.crates.io/crates/futures/futures-0.3.26.crate"
}
- version: "0.3.21"
+ version: "0.3.26"
license_type: NOTICE
last_upgrade_date {
- year: 2022
- month: 3
- day: 1
+ year: 2023
+ month: 2
+ day: 15
}
}
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..45e1f5b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,61 @@
+<p align="center">
+ <img alt="futures-rs" src="https://raw.githubusercontent.com/rust-lang/futures-rs/gh-pages/assets/images/futures-rs-logo.svg?sanitize=true" width="400">
+</p>
+
+<p align="center">
+ Zero-cost asynchronous programming in Rust
+</p>
+
+<p align="center">
+ <a href="https://github.com/rust-lang/futures-rs/actions?query=branch%3Amaster">
+ <img alt="Build Status" src="https://img.shields.io/github/actions/workflow/status/rust-lang/futures-rs/ci.yml?branch=master">
+ </a>
+
+ <a href="https://crates.io/crates/futures">
+ <img alt="crates.io" src="https://img.shields.io/crates/v/futures.svg">
+ </a>
+</p>
+
+<p align="center">
+ <a href="https://docs.rs/futures">
+ Documentation
+ </a> | <a href="https://rust-lang.github.io/futures-rs/">
+ Website
+ </a>
+</p>
+
+`futures-rs` is a library providing the foundations for asynchronous programming in Rust.
+It includes key trait definitions like `Stream`, as well as utilities like `join!`,
+`select!`, and various futures combinator methods which enable expressive asynchronous
+control flow.
+
+## Usage
+
+Add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+futures = "0.3"
+```
+
+The current `futures` requires Rust 1.45 or later.
+
+### Feature `std`
+
+Futures-rs works without the standard library, such as in bare metal environments.
+However, it has a significantly reduced API surface. To use futures-rs in
+a `#[no_std]` environment, use:
+
+```toml
+[dependencies]
+futures = { version = "0.3", default-features = false }
+```
+
+## License
+
+Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or
+[MIT license](LICENSE-MIT) at your option.
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in the work by you, as defined in the Apache-2.0 license, shall
+be dual licensed as above, without any additional terms or conditions.
diff --git a/TEST_MAPPING b/TEST_MAPPING
index e2df61d..f20282b 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -5,35 +5,31 @@
"path": "external/rust/crates/anyhow"
},
{
- "path": "external/rust/crates/tokio"
- }
- ],
- "presubmit": [
+ "path": "external/rust/crates/futures-channel"
+ },
{
- "name": "ZipFuseTest"
+ "path": "external/rust/crates/futures-executor"
},
{
- "name": "authfs_device_test_src_lib"
+ "path": "external/rust/crates/tokio"
},
{
- "name": "doh_unit_test"
+ "path": "packages/modules/DnsResolver"
},
{
- "name": "virtualizationservice_device_test"
- }
- ],
- "presubmit-rust": [
+ "path": "packages/modules/Virtualization/authfs"
+ },
{
- "name": "ZipFuseTest"
+ "path": "packages/modules/Virtualization/virtualizationmanager"
},
{
- "name": "authfs_device_test_src_lib"
+ "path": "packages/modules/Virtualization/zipfuse"
},
{
- "name": "doh_unit_test"
+ "path": "system/security/keystore2"
},
{
- "name": "virtualizationservice_device_test"
+ "path": "system/security/keystore2/legacykeystore"
}
]
}
diff --git a/cargo2android.json b/cargo2android.json
index a7e2a4b..7b27fc0 100644
--- a/cargo2android.json
+++ b/cargo2android.json
@@ -1,7 +1,7 @@
{
"apex-available": [
"//apex_available:platform",
- "com.android.bluetooth",
+ "com.android.btservices",
"com.android.resolv",
"com.android.virt"
],
diff --git a/src/lib.rs b/src/lib.rs
index b8ebc61..b972f51 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -25,13 +25,13 @@
//! within macros and keywords such as async and await!.
//!
//! ```rust
-//! # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
//! # use futures::channel::mpsc;
//! # use futures::executor; ///standard executors to provide a context for futures and streams
//! # use futures::executor::ThreadPool;
//! # use futures::StreamExt;
//! #
//! fn main() {
+//! # {
//! let pool = ThreadPool::new().expect("Failed to build pool");
//! let (tx, rx) = mpsc::unbounded::<i32>();
//!
@@ -73,6 +73,8 @@
//! let values: Vec<i32> = executor::block_on(fut_values);
//!
//! println!("Values={:?}", values);
+//! # }
+//! # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
//! }
//! ```
//!
diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs
index ce1f3a3..82a617f 100644
--- a/tests/async_await_macros.rs
+++ b/tests/async_await_macros.rs
@@ -346,43 +346,47 @@ fn stream_select() {
});
}
+#[cfg_attr(not(target_pointer_width = "64"), ignore)]
#[test]
fn join_size() {
let fut = async {
let ready = future::ready(0i32);
join!(ready)
};
- assert_eq!(mem::size_of_val(&fut), 16);
+ assert_eq!(mem::size_of_val(&fut), 24);
let fut = async {
let ready1 = future::ready(0i32);
let ready2 = future::ready(0i32);
join!(ready1, ready2)
};
- assert_eq!(mem::size_of_val(&fut), 28);
+ assert_eq!(mem::size_of_val(&fut), 40);
}
+#[cfg_attr(not(target_pointer_width = "64"), ignore)]
#[test]
fn try_join_size() {
let fut = async {
let ready = future::ready(Ok::<i32, i32>(0));
try_join!(ready)
};
- assert_eq!(mem::size_of_val(&fut), 16);
+ assert_eq!(mem::size_of_val(&fut), 24);
let fut = async {
let ready1 = future::ready(Ok::<i32, i32>(0));
let ready2 = future::ready(Ok::<i32, i32>(0));
try_join!(ready1, ready2)
};
- assert_eq!(mem::size_of_val(&fut), 28);
+ assert_eq!(mem::size_of_val(&fut), 48);
}
+#[allow(clippy::let_underscore_future)]
#[test]
fn join_doesnt_require_unpin() {
let _ = async { join!(async {}, async {}) };
}
+#[allow(clippy::let_underscore_future)]
#[test]
fn try_join_doesnt_require_unpin() {
let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) };
diff --git a/tests/auto_traits.rs b/tests/auto_traits.rs
index b3d8b00..5fc0f7d 100644
--- a/tests/auto_traits.rs
+++ b/tests/auto_traits.rs
@@ -576,10 +576,10 @@ pub mod future {
// TryJoin3, TryJoin4, TryJoin5 are the same as TryJoin
- assert_impl!(TryJoinAll<SendTryFuture<()>>: Send);
+ assert_impl!(TryJoinAll<SendTryFuture<(), ()>>: Send);
assert_not_impl!(TryJoinAll<LocalTryFuture>: Send);
assert_not_impl!(TryJoinAll<SendTryFuture>: Send);
- assert_impl!(TryJoinAll<SyncTryFuture<()>>: Sync);
+ assert_impl!(TryJoinAll<SyncTryFuture<(), ()>>: Sync);
assert_not_impl!(TryJoinAll<LocalTryFuture>: Sync);
assert_not_impl!(TryJoinAll<SyncTryFuture>: Sync);
assert_impl!(TryJoinAll<PinnedTryFuture>: Unpin);
@@ -1480,10 +1480,10 @@ pub mod stream {
assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
assert_impl!(ReadyChunks<SendStream<()>>: Send);
- assert_not_impl!(ReadyChunks<SendStream>: Send);
+ assert_impl!(ReadyChunks<SendStream>: Send);
assert_not_impl!(ReadyChunks<LocalStream>: Send);
assert_impl!(ReadyChunks<SyncStream<()>>: Sync);
- assert_not_impl!(ReadyChunks<SyncStream>: Sync);
+ assert_impl!(ReadyChunks<SyncStream>: Sync);
assert_not_impl!(ReadyChunks<LocalStream>: Sync);
assert_impl!(ReadyChunks<UnpinStream>: Unpin);
assert_not_impl!(ReadyChunks<PinnedStream>: Unpin);
diff --git a/tests/eventual.rs b/tests/eventual.rs
index 3461380..57a49b2 100644
--- a/tests/eventual.rs
+++ b/tests/eventual.rs
@@ -1,5 +1,3 @@
-#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
-
use futures::channel::oneshot;
use futures::executor::ThreadPool;
use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
@@ -18,6 +16,8 @@ fn join1() {
run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap()));
assert_eq!(rx.recv(), Ok((1, 2)));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -32,6 +32,8 @@ fn join2() {
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok((1, 2)));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -45,6 +47,8 @@ fn join3() {
assert_eq!(rx.recv(), Ok(1));
assert!(rx.recv().is_err());
drop(c2);
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -58,6 +62,8 @@ fn join4() {
assert!(rx.recv().is_ok());
drop(c2);
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -75,6 +81,8 @@ fn join5() {
c3.send(3).unwrap();
assert_eq!(rx.recv(), Ok(((1, 2), 3)));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -94,6 +102,8 @@ fn select1() {
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -113,6 +123,8 @@ fn select2() {
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -132,10 +144,14 @@ fn select3() {
drop(c2);
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
fn select4() {
+ const N: usize = if cfg!(miri) { 100 } else { 10000 };
+
let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>();
let t = thread::spawn(move || {
@@ -145,7 +161,7 @@ fn select4() {
});
let (tx2, rx2) = mpsc::channel();
- for _ in 0..10000 {
+ for _ in 0..N {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
@@ -158,4 +174,6 @@ fn select4() {
drop(tx);
t.join().unwrap();
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
diff --git a/tests/future_join.rs b/tests/future_join.rs
new file mode 100644
index 0000000..f5df9d7
--- /dev/null
+++ b/tests/future_join.rs
@@ -0,0 +1,32 @@
+use futures::executor::block_on;
+use futures::future::Future;
+use std::task::Poll;
+
+/// This tests verifies (through miri) that self-referencing
+/// futures are not invalidated when joining them.
+#[test]
+fn futures_join_macro_self_referential() {
+ block_on(async { futures::join!(yield_now(), trouble()) });
+}
+
+async fn trouble() {
+ let lucky_number = 42;
+ let problematic_variable = &lucky_number;
+
+ yield_now().await;
+
+ // problematic dereference
+ let _ = { *problematic_variable };
+}
+
+fn yield_now() -> impl Future<Output = ()> {
+ let mut yielded = false;
+ std::future::poll_fn(move |cx| {
+ if core::mem::replace(&mut yielded, true) {
+ Poll::Ready(())
+ } else {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ })
+}
diff --git a/tests/future_shared.rs b/tests/future_shared.rs
index 3ceaebb..bd69c1d 100644
--- a/tests/future_shared.rs
+++ b/tests/future_shared.rs
@@ -3,6 +3,7 @@ use futures::executor::{block_on, LocalPool};
use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt};
use futures::task::LocalSpawn;
use std::cell::{Cell, RefCell};
+use std::panic::AssertUnwindSafe;
use std::rc::Rc;
use std::task::Poll;
use std::thread;
@@ -96,7 +97,6 @@ fn drop_in_poll() {
assert_eq!(block_on(future1), 1);
}
-#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn peek() {
let mut local_pool = LocalPool::new();
@@ -152,6 +152,52 @@ fn downgrade() {
}
#[test]
+fn ptr_eq() {
+ use future::FusedFuture;
+ use std::collections::hash_map::DefaultHasher;
+ use std::hash::Hasher;
+
+ let (tx, rx) = oneshot::channel::<i32>();
+ let shared = rx.shared();
+ let mut shared2 = shared.clone();
+ let mut hasher = DefaultHasher::new();
+ let mut hasher2 = DefaultHasher::new();
+
+ // Because these two futures share the same underlying future,
+ // `ptr_eq` should return true.
+ assert!(shared.ptr_eq(&shared2));
+ // Equivalence relations are symmetric
+ assert!(shared2.ptr_eq(&shared));
+
+ // If `ptr_eq` returns true, they should hash to the same value.
+ shared.ptr_hash(&mut hasher);
+ shared2.ptr_hash(&mut hasher2);
+ assert_eq!(hasher.finish(), hasher2.finish());
+
+ tx.send(42).unwrap();
+ assert_eq!(block_on(&mut shared2).unwrap(), 42);
+
+ // Now that `shared2` has completed, `ptr_eq` should return false.
+ assert!(shared2.is_terminated());
+ assert!(!shared.ptr_eq(&shared2));
+
+ // `ptr_eq` should continue to work for the other `Shared`.
+ let shared3 = shared.clone();
+ let mut hasher3 = DefaultHasher::new();
+ assert!(shared.ptr_eq(&shared3));
+
+ shared3.ptr_hash(&mut hasher3);
+ assert_eq!(hasher.finish(), hasher3.finish());
+
+ let (_tx, rx) = oneshot::channel::<i32>();
+ let shared4 = rx.shared();
+
+ // And `ptr_eq` should return false for two futures that don't share
+ // the underlying future.
+ assert!(!shared.ptr_eq(&shared4));
+}
+
+#[test]
fn dont_clone_in_single_owner_shared_future() {
let counter = CountClone(Rc::new(Cell::new(0)));
let (tx, rx) = oneshot::channel();
@@ -194,3 +240,34 @@ fn shared_future_that_wakes_itself_until_pending_is_returned() {
// has returned pending
assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ()));
}
+
+#[test]
+#[should_panic(expected = "inner future panicked during poll")]
+fn panic_while_poll() {
+ let fut = futures::future::poll_fn::<i8, _>(|_cx| panic!("test")).shared();
+
+ let fut_captured = fut.clone();
+ std::panic::catch_unwind(AssertUnwindSafe(|| {
+ block_on(fut_captured);
+ }))
+ .unwrap_err();
+
+ block_on(fut);
+}
+
+#[test]
+#[should_panic(expected = "test_marker")]
+fn poll_while_panic() {
+ struct S;
+
+ impl Drop for S {
+ fn drop(&mut self) {
+ let fut = futures::future::ready(1).shared();
+ assert_eq!(block_on(fut.clone()), 1);
+ assert_eq!(block_on(fut), 1);
+ }
+ }
+
+ let _s = S {};
+ panic!("test_marker");
+}
diff --git a/tests/lock_mutex.rs b/tests/lock_mutex.rs
index c92ef50..c15e76b 100644
--- a/tests/lock_mutex.rs
+++ b/tests/lock_mutex.rs
@@ -34,34 +34,36 @@ fn mutex_wakes_waiters() {
assert!(waiter.poll_unpin(&mut panic_context()).is_ready());
}
-#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn mutex_contested() {
- let (tx, mut rx) = mpsc::unbounded();
- let pool = ThreadPool::builder().pool_size(16).create().unwrap();
+ {
+ let (tx, mut rx) = mpsc::unbounded();
+ let pool = ThreadPool::builder().pool_size(16).create().unwrap();
- let tx = Arc::new(tx);
- let mutex = Arc::new(Mutex::new(0));
+ let tx = Arc::new(tx);
+ let mutex = Arc::new(Mutex::new(0));
- let num_tasks = 1000;
- for _ in 0..num_tasks {
- let tx = tx.clone();
- let mutex = mutex.clone();
- pool.spawn(async move {
- let mut lock = mutex.lock().await;
- ready(()).pending_once().await;
- *lock += 1;
- tx.unbounded_send(()).unwrap();
- drop(lock);
- })
- .unwrap();
- }
-
- block_on(async {
+ let num_tasks = 1000;
for _ in 0..num_tasks {
- rx.next().await.unwrap();
+ let tx = tx.clone();
+ let mutex = mutex.clone();
+ pool.spawn(async move {
+ let mut lock = mutex.lock().await;
+ ready(()).pending_once().await;
+ *lock += 1;
+ tx.unbounded_send(()).unwrap();
+ drop(lock);
+ })
+ .unwrap();
}
- let lock = mutex.lock().await;
- assert_eq!(num_tasks, *lock);
- })
+
+ block_on(async {
+ for _ in 0..num_tasks {
+ rx.next().await.unwrap();
+ }
+ let lock = mutex.lock().await;
+ assert_eq!(num_tasks, *lock);
+ });
+ }
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs
index 3b082d2..85871e9 100644
--- a/tests/macro_comma_support.rs
+++ b/tests/macro_comma_support.rs
@@ -14,7 +14,6 @@ fn ready() {
}))
}
-#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn poll() {
use futures::poll;
diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs
index afba8f2..c19d625 100644
--- a/tests/ready_queue.rs
+++ b/tests/ready_queue.rs
@@ -93,10 +93,7 @@ fn dropping_ready_queue() {
#[test]
fn stress() {
- #[cfg(miri)]
- const ITER: usize = 30;
- #[cfg(not(miri))]
- const ITER: usize = 300;
+ const ITER: usize = if cfg!(miri) { 30 } else { 300 };
for i in 0..ITER {
let n = (i % 10) + 1;
diff --git a/tests/recurse.rs b/tests/recurse.rs
index f06524f..d81753c 100644
--- a/tests/recurse.rs
+++ b/tests/recurse.rs
@@ -3,7 +3,6 @@ use futures::future::{self, BoxFuture, FutureExt};
use std::sync::mpsc;
use std::thread;
-#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn lots() {
#[cfg(not(futures_sanitizer))]
diff --git a/tests/sink.rs b/tests/sink.rs
index dc826bd..5b691e7 100644
--- a/tests/sink.rs
+++ b/tests/sink.rs
@@ -138,7 +138,7 @@ impl<T: Unpin> ManualFlush<T> {
for task in self.waiting_tasks.drain(..) {
task.wake()
}
- mem::replace(&mut self.data, Vec::new())
+ mem::take(&mut self.data)
}
}
@@ -288,7 +288,6 @@ fn mpsc_blocking_start_send() {
// test `flush` by using `with` to make the first insertion into a sink block
// until a oneshot is completed
-#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn with_flush() {
let (tx, rx) = oneshot::channel();
diff --git a/tests/stream.rs b/tests/stream.rs
index 71ec654..5cde458 100644
--- a/tests/stream.rs
+++ b/tests/stream.rs
@@ -1,5 +1,9 @@
+use std::cell::Cell;
use std::iter;
+use std::pin::Pin;
+use std::rc::Rc;
use std::sync::Arc;
+use std::task::Context;
use futures::channel::mpsc;
use futures::executor::block_on;
@@ -9,6 +13,7 @@ use futures::sink::SinkExt;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use futures::{ready, FutureExt};
+use futures_core::Stream;
use futures_test::task::noop_context;
#[test]
@@ -419,3 +424,40 @@ fn ready_chunks() {
assert_eq!(s.next().await.unwrap(), vec![4]);
});
}
+
+struct SlowStream {
+ times_should_poll: usize,
+ times_polled: Rc<Cell<usize>>,
+}
+impl Stream for SlowStream {
+ type Item = usize;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.times_polled.set(self.times_polled.get() + 1);
+ if self.times_polled.get() % 2 == 0 {
+ cx.waker().wake_by_ref();
+ return Poll::Pending;
+ }
+ if self.times_polled.get() >= self.times_should_poll {
+ return Poll::Ready(None);
+ }
+ Poll::Ready(Some(self.times_polled.get()))
+ }
+}
+
+#[test]
+fn select_with_strategy_doesnt_terminate_early() {
+ for side in [stream::PollNext::Left, stream::PollNext::Right] {
+ let times_should_poll = 10;
+ let count = Rc::new(Cell::new(0));
+ let b = stream::iter([10, 20]);
+
+ let mut selected = stream::select_with_strategy(
+ SlowStream { times_should_poll, times_polled: count.clone() },
+ b,
+ |_: &mut ()| side,
+ );
+ block_on(async move { while selected.next().await.is_some() {} });
+ assert_eq!(count.get(), times_should_poll + 1);
+ }
+}
diff --git a/tests/stream_futures_ordered.rs b/tests/stream_futures_ordered.rs
index 84e0bcc..5a4a3e2 100644
--- a/tests/stream_futures_ordered.rs
+++ b/tests/stream_futures_ordered.rs
@@ -2,6 +2,7 @@ use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, Future, FutureExt, TryFutureExt};
use futures::stream::{FuturesOrdered, StreamExt};
+use futures::task::Poll;
use futures_test::task::noop_context;
use std::any::Any;
@@ -26,7 +27,6 @@ fn works_1() {
assert_eq!(None, iter.next());
}
-#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn works_2() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
@@ -47,6 +47,69 @@ fn works_2() {
}
#[test]
+fn test_push_front() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+ let (d_tx, d_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_back(a_rx);
+ stream.push_back(b_rx);
+ stream.push_back(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // 1 and 2 should be received in order
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+
+ stream.push_front(d_rx);
+ d_tx.send(4).unwrap();
+
+ // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next
+ // and then 3 after it
+ assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+}
+
+#[test]
+fn test_push_back() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+ let (d_tx, d_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_back(a_rx);
+ stream.push_back(b_rx);
+ stream.push_back(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // All results should be received in order
+
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+
+ stream.push_back(d_rx);
+ d_tx.send(4).unwrap();
+
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
+}
+
+#[test]
fn from_iterator() {
let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
.into_iter()
@@ -55,7 +118,6 @@ fn from_iterator() {
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
}
-#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn queue_never_unblocked() {
let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
@@ -84,3 +146,27 @@ fn queue_never_unblocked() {
assert!(stream.poll_next_unpin(cx).is_pending());
assert!(stream.poll_next_unpin(cx).is_pending());
}
+
+#[test]
+fn test_push_front_negative() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_front(a_rx);
+ stream.push_front(b_rx);
+ stream.push_front(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // These should all be recieved in reverse order
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+}
diff --git a/tests/stream_futures_unordered.rs b/tests/stream_futures_unordered.rs
index f62f733..b568280 100644
--- a/tests/stream_futures_unordered.rs
+++ b/tests/stream_futures_unordered.rs
@@ -56,7 +56,6 @@ fn works_1() {
assert_eq!(None, iter.next());
}
-#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn works_2() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
@@ -86,7 +85,6 @@ fn from_iterator() {
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
}
-#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
#[test]
fn finished_future() {
let (_a_tx, a_rx) = oneshot::channel::<i32>();
@@ -262,6 +260,20 @@ fn into_iter_len() {
}
#[test]
+fn into_iter_partial() {
+ let stream = vec![future::ready(1), future::ready(2), future::ready(3), future::ready(4)]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ let mut into_iter = stream.into_iter();
+ assert!(into_iter.next().is_some());
+ assert!(into_iter.next().is_some());
+ assert!(into_iter.next().is_some());
+ assert_eq!(into_iter.len(), 1);
+ // don't panic when iterator is dropped before completing
+}
+
+#[test]
fn futures_not_moved_after_poll() {
// Future that will be ready after being polled twice,
// asserting that it does not move.
diff --git a/tests/stream_try_stream.rs b/tests/stream_try_stream.rs
index d83fc54..194e74d 100644
--- a/tests/stream_try_stream.rs
+++ b/tests/stream_try_stream.rs
@@ -1,5 +1,3 @@
-#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
-
use futures::{
stream::{self, StreamExt, TryStreamExt},
task::Poll,
diff --git a/tests/task_atomic_waker.rs b/tests/task_atomic_waker.rs
index 2d1612a..cec3db2 100644
--- a/tests/task_atomic_waker.rs
+++ b/tests/task_atomic_waker.rs
@@ -6,7 +6,6 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
-#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn basic() {
let atomic_waker = Arc::new(AtomicWaker::new());
diff --git a/tests_disabled/stream.rs b/tests_disabled/stream.rs
index 854dbad..a4eec2c 100644
--- a/tests_disabled/stream.rs
+++ b/tests_disabled/stream.rs
@@ -318,7 +318,6 @@ fn forward() {
}
#[test]
-#[allow(deprecated)]
fn concat() {
let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));