aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp2
-rw-r--r--Cargo.toml18
-rw-r--r--Cargo.toml.orig18
-rw-r--r--METADATA23
-rw-r--r--README.md2
-rw-r--r--tests/auto_traits.rs43
-rw-r--r--tests/bilock.rs (renamed from tests_disabled/bilock.rs)68
-rw-r--r--tests/stream.rs114
-rw-r--r--tests/stream_futures_unordered.rs25
-rw-r--r--tests/stream_try_stream.rs147
11 files changed, 376 insertions, 86 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index c5e5d7e..f60a8e9 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "5e3693a350f96244151081d2c030208cd15f9572"
+ "sha1": "de1a0fd64a1bcae9a1534ed4da1699632993cc26"
},
"path_in_vcs": "futures"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 46c18a2..f6f44d5 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@ rust_library {
host_supported: true,
crate_name: "futures",
cargo_env_compat: true,
- cargo_pkg_version: "0.3.26",
+ cargo_pkg_version: "0.3.30",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
diff --git a/Cargo.toml b/Cargo.toml
index 51e052a..5272299 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,9 +11,9 @@
[package]
edition = "2018"
-rust-version = "1.45"
+rust-version = "1.56"
name = "futures"
-version = "0.3.26"
+version = "0.3.30"
description = """
An implementation of futures and streams featuring zero allocations,
composability, and iterator-like interfaces.
@@ -47,33 +47,33 @@ features = [
]
[dependencies.futures-channel]
-version = "0.3.26"
+version = "0.3.30"
features = ["sink"]
default-features = false
[dependencies.futures-core]
-version = "0.3.26"
+version = "0.3.30"
default-features = false
[dependencies.futures-executor]
-version = "0.3.26"
+version = "0.3.30"
optional = true
default-features = false
[dependencies.futures-io]
-version = "0.3.26"
+version = "0.3.30"
default-features = false
[dependencies.futures-sink]
-version = "0.3.26"
+version = "0.3.30"
default-features = false
[dependencies.futures-task]
-version = "0.3.26"
+version = "0.3.30"
default-features = false
[dependencies.futures-util]
-version = "0.3.26"
+version = "0.3.30"
features = ["sink"]
default-features = false
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index e7a5f38..6208f61 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,8 +1,8 @@
[package]
name = "futures"
-version = "0.3.26"
+version = "0.3.30"
edition = "2018"
-rust-version = "1.45"
+rust-version = "1.56"
license = "MIT OR Apache-2.0"
readme = "../README.md"
keywords = ["futures", "async", "future"]
@@ -15,13 +15,13 @@ composability, and iterator-like interfaces.
categories = ["asynchronous"]
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.26", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.26", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.26", default-features = false, features = ["sink"] }
-futures-executor = { path = "../futures-executor", version = "0.3.26", default-features = false, optional = true }
-futures-io = { path = "../futures-io", version = "0.3.26", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.26", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.26", default-features = false, features = ["sink"] }
+futures-core = { path = "../futures-core", version = "0.3.30", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.30", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.30", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.30", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.30", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.30", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.30", default-features = false, features = ["sink"] }
[dev-dependencies]
futures-executor = { path = "../futures-executor", features = ["thread-pool"] }
diff --git a/METADATA b/METADATA
index 240fa20..06699ef 100644
--- a/METADATA
+++ b/METADATA
@@ -1,23 +1,20 @@
# This project was upgraded with external_updater.
-# Usage: tools/external_updater/updater.sh update rust/crates/futures
-# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+# Usage: tools/external_updater/updater.sh update external/rust/crates/futures
+# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md
name: "futures"
description: "An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces."
third_party {
- url {
- type: HOMEPAGE
- value: "https://crates.io/crates/futures"
- }
- url {
- type: ARCHIVE
- value: "https://static.crates.io/crates/futures/futures-0.3.26.crate"
- }
- version: "0.3.26"
license_type: NOTICE
last_upgrade_date {
- year: 2023
+ year: 2024
month: 2
- day: 15
+ day: 1
+ }
+ homepage: "https://crates.io/crates/futures"
+ identifier {
+ type: "Archive"
+ value: "https://static.crates.io/crates/futures/futures-0.3.30.crate"
+ version: "0.3.30"
}
}
diff --git a/README.md b/README.md
index 45e1f5b..355d607 100644
--- a/README.md
+++ b/README.md
@@ -38,7 +38,7 @@ Add this to your `Cargo.toml`:
futures = "0.3"
```
-The current `futures` requires Rust 1.45 or later.
+The current `futures` requires Rust 1.56 or later.
### Feature `std`
diff --git a/tests/auto_traits.rs b/tests/auto_traits.rs
index 5fc0f7d..004fda1 100644
--- a/tests/auto_traits.rs
+++ b/tests/auto_traits.rs
@@ -18,6 +18,8 @@ pub type SendFuture<T = *const ()> = Pin<Box<dyn Future<Output = T> + Send>>;
pub type SendTryFuture<T = *const (), E = *const ()> = SendFuture<Result<T, E>>;
pub type SyncFuture<T = *const ()> = Pin<Box<dyn Future<Output = T> + Sync>>;
pub type SyncTryFuture<T = *const (), E = *const ()> = SyncFuture<Result<T, E>>;
+pub type SendSyncFuture<T = *const ()> = Pin<Box<dyn Future<Output = T> + Send + Sync>>;
+pub type SendSyncTryFuture<T = *const (), E = *const ()> = SendSyncFuture<Result<T, E>>;
pub type UnpinFuture<T = PhantomPinned> = LocalFuture<T>;
pub type UnpinTryFuture<T = PhantomPinned, E = PhantomPinned> = UnpinFuture<Result<T, E>>;
pub struct PinnedFuture<T = PhantomPinned>(PhantomPinned, PhantomData<T>);
@@ -35,6 +37,8 @@ pub type SendStream<T = *const ()> = Pin<Box<dyn Stream<Item = T> + Send>>;
pub type SendTryStream<T = *const (), E = *const ()> = SendStream<Result<T, E>>;
pub type SyncStream<T = *const ()> = Pin<Box<dyn Stream<Item = T> + Sync>>;
pub type SyncTryStream<T = *const (), E = *const ()> = SyncStream<Result<T, E>>;
+pub type SendSyncStream<T = *const ()> = Pin<Box<dyn Stream<Item = T> + Send + Sync>>;
+pub type SendSyncTryStream<T = *const (), E = *const ()> = SendSyncStream<Result<T, E>>;
pub type UnpinStream<T = PhantomPinned> = LocalStream<T>;
pub type UnpinTryStream<T = PhantomPinned, E = PhantomPinned> = UnpinStream<Result<T, E>>;
pub struct PinnedStream<T = PhantomPinned>(PhantomPinned, PhantomData<T>);
@@ -365,9 +369,10 @@ pub mod future {
assert_impl!(JoinAll<SendFuture<()>>: Send);
assert_not_impl!(JoinAll<LocalFuture>: Send);
assert_not_impl!(JoinAll<SendFuture>: Send);
- assert_impl!(JoinAll<SyncFuture<()>>: Sync);
- assert_not_impl!(JoinAll<LocalFuture>: Sync);
- assert_not_impl!(JoinAll<SyncFuture>: Sync);
+ assert_impl!(JoinAll<SendSyncFuture<()>>: Sync);
+ assert_not_impl!(JoinAll<SendFuture<()>>: Sync);
+ assert_not_impl!(JoinAll<SyncFuture<()>>: Sync);
+ assert_not_impl!(JoinAll<SendSyncFuture>: Sync);
assert_impl!(JoinAll<PinnedFuture>: Unpin);
assert_impl!(Lazy<()>: Send);
@@ -579,9 +584,10 @@ pub mod future {
assert_impl!(TryJoinAll<SendTryFuture<(), ()>>: Send);
assert_not_impl!(TryJoinAll<LocalTryFuture>: Send);
assert_not_impl!(TryJoinAll<SendTryFuture>: Send);
- assert_impl!(TryJoinAll<SyncTryFuture<(), ()>>: Sync);
- assert_not_impl!(TryJoinAll<LocalTryFuture>: Sync);
- assert_not_impl!(TryJoinAll<SyncTryFuture>: Sync);
+ assert_impl!(TryJoinAll<SendSyncTryFuture<(), ()>>: Sync);
+ assert_not_impl!(TryJoinAll<SendTryFuture<(), ()>>: Sync);
+ assert_not_impl!(TryJoinAll<SyncTryFuture<(), ()>>: Sync);
+ assert_not_impl!(TryJoinAll<SendSyncTryFuture>: Sync);
assert_impl!(TryJoinAll<PinnedTryFuture>: Unpin);
assert_impl!(TrySelect<SendFuture, SendFuture>: Send);
@@ -1118,10 +1124,9 @@ pub mod stream {
assert_not_impl!(Buffered<SendStream<SendFuture>>: Send);
assert_not_impl!(Buffered<SendStream<LocalFuture>>: Send);
assert_not_impl!(Buffered<LocalStream<SendFuture<()>>>: Send);
- assert_impl!(Buffered<SyncStream<SyncFuture<()>>>: Sync);
- assert_not_impl!(Buffered<SyncStream<SyncFuture>>: Sync);
- assert_not_impl!(Buffered<SyncStream<LocalFuture>>: Sync);
- assert_not_impl!(Buffered<LocalStream<SyncFuture<()>>>: Sync);
+ assert_impl!(Buffered<SyncStream<SendSyncFuture<()>>>: Sync);
+ assert_not_impl!(Buffered<SyncStream<SyncFuture<()>>>: Sync);
+ assert_not_impl!(Buffered<LocalStream<SendSyncFuture<()>>>: Sync);
assert_impl!(Buffered<UnpinStream<PinnedFuture>>: Unpin);
assert_not_impl!(Buffered<PinnedStream<PinnedFuture>>: Unpin);
@@ -1303,9 +1308,10 @@ pub mod stream {
assert_impl!(FuturesOrdered<SendFuture<()>>: Send);
assert_not_impl!(FuturesOrdered<SendFuture>: Send);
assert_not_impl!(FuturesOrdered<SendFuture>: Send);
- assert_impl!(FuturesOrdered<SyncFuture<()>>: Sync);
- assert_not_impl!(FuturesOrdered<LocalFuture<()>>: Sync);
- assert_not_impl!(FuturesOrdered<LocalFuture<()>>: Sync);
+ assert_impl!(FuturesOrdered<SendSyncFuture<()>>: Sync);
+ assert_not_impl!(FuturesOrdered<SyncFuture<()>>: Sync);
+ assert_not_impl!(FuturesOrdered<SendFuture<()>>: Sync);
+ assert_not_impl!(FuturesOrdered<SendSyncFuture>: Sync);
assert_impl!(FuturesOrdered<PinnedFuture>: Unpin);
assert_impl!(FuturesUnordered<()>: Send);
@@ -1647,11 +1653,12 @@ pub mod stream {
assert_not_impl!(TryBuffered<SendTryStream<SendTryFuture<(), *const ()>>>: Send);
assert_not_impl!(TryBuffered<SendTryStream<LocalTryFuture<(), ()>>>: Send);
assert_not_impl!(TryBuffered<LocalTryStream<SendTryFuture<(), ()>>>: Send);
- assert_impl!(TryBuffered<SyncTryStream<SyncTryFuture<(), ()>>>: Sync);
- assert_not_impl!(TryBuffered<SyncTryStream<SyncTryFuture<*const (), ()>>>: Sync);
- assert_not_impl!(TryBuffered<SyncTryStream<SyncTryFuture<(), *const ()>>>: Sync);
- assert_not_impl!(TryBuffered<SyncTryStream<LocalTryFuture<(), ()>>>: Sync);
- assert_not_impl!(TryBuffered<LocalTryStream<SyncTryFuture<(), ()>>>: Sync);
+ assert_impl!(TryBuffered<SyncTryStream<SendSyncTryFuture<(), ()>>>: Sync);
+ assert_not_impl!(TryBuffered<SyncTryStream<SendSyncTryFuture<*const (), ()>>>: Sync);
+ assert_not_impl!(TryBuffered<SyncTryStream<SendSyncTryFuture<(), *const ()>>>: Sync);
+ assert_not_impl!(TryBuffered<SyncTryStream<SendTryFuture<(), ()>>>: Sync);
+ assert_not_impl!(TryBuffered<SyncTryStream<SyncTryFuture<(), ()>>>: Sync);
+ assert_not_impl!(TryBuffered<LocalTryStream<SendSyncTryFuture<(), ()>>>: Sync);
assert_impl!(TryBuffered<UnpinTryStream<PinnedTryFuture>>: Unpin);
assert_not_impl!(TryBuffered<PinnedTryStream<UnpinTryFuture>>: Unpin);
diff --git a/tests_disabled/bilock.rs b/tests/bilock.rs
index 0166ca4..b103487 100644
--- a/tests_disabled/bilock.rs
+++ b/tests/bilock.rs
@@ -1,34 +1,38 @@
+#![cfg(feature = "bilock")]
+
+use futures::executor::block_on;
use futures::future;
use futures::stream;
-use futures::task;
+use futures::task::{Context, Poll};
+use futures::Future;
+use futures::StreamExt;
+use futures_test::task::noop_context;
use futures_util::lock::BiLock;
+use std::pin::Pin;
use std::thread;
-// mod support;
-// use support::*;
-
#[test]
fn smoke() {
- let future = future::lazy(|_| {
+ let future = future::lazy(|cx| {
let (a, b) = BiLock::new(1);
{
- let mut lock = match a.poll_lock() {
+ let mut lock = match a.poll_lock(cx) {
Poll::Ready(l) => l,
Poll::Pending => panic!("poll not ready"),
};
assert_eq!(*lock, 1);
*lock = 2;
- assert!(b.poll_lock().is_pending());
- assert!(a.poll_lock().is_pending());
+ assert!(b.poll_lock(cx).is_pending());
+ assert!(a.poll_lock(cx).is_pending());
}
- assert!(b.poll_lock().is_ready());
- assert!(a.poll_lock().is_ready());
+ assert!(b.poll_lock(cx).is_ready());
+ assert!(a.poll_lock(cx).is_ready());
{
- let lock = match b.poll_lock() {
+ let lock = match b.poll_lock(cx) {
Poll::Ready(l) => l,
Poll::Pending => panic!("poll not ready"),
};
@@ -40,34 +44,32 @@ fn smoke() {
Ok::<(), ()>(())
});
- assert!(task::spawn(future)
- .poll_future_notify(&notify_noop(), 0)
- .expect("failure in poll")
- .is_ready());
+ assert_eq!(block_on(future), Ok(()));
}
#[test]
fn concurrent() {
const N: usize = 10000;
+ let mut cx = noop_context();
let (a, b) = BiLock::new(0);
let a = Increment { a: Some(a), remaining: N };
- let b = stream::iter_ok(0..N).fold(b, |b, _n| {
- b.lock().map(|mut b| {
- *b += 1;
- b.unlock()
- })
+ let b = stream::iter(0..N).fold(b, |b, _n| async {
+ let mut g = b.lock().await;
+ *g += 1;
+ drop(g);
+ b
});
- let t1 = thread::spawn(move || a.wait());
- let b = b.wait().expect("b error");
- let a = t1.join().unwrap().expect("a error");
+ let t1 = thread::spawn(move || block_on(a));
+ let b = block_on(b);
+ let a = t1.join().unwrap();
- match a.poll_lock() {
+ match a.poll_lock(&mut cx) {
Poll::Ready(l) => assert_eq!(*l, 2 * N),
Poll::Pending => panic!("poll not ready"),
}
- match b.poll_lock() {
+ match b.poll_lock(&mut cx) {
Poll::Ready(l) => assert_eq!(*l, 2 * N),
Poll::Pending => panic!("poll not ready"),
}
@@ -80,22 +82,22 @@ fn concurrent() {
}
impl Future for Increment {
- type Item = BiLock<usize>;
- type Error = ();
+ type Output = BiLock<usize>;
- fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<BiLock<usize>> {
loop {
if self.remaining == 0 {
- return Ok(self.a.take().unwrap().into());
+ return self.a.take().unwrap().into();
}
- let a = self.a.as_ref().unwrap();
- let mut a = match a.poll_lock() {
+ let a = self.a.as_mut().unwrap();
+ let mut a = match a.poll_lock(cx) {
Poll::Ready(l) => l,
- Poll::Pending => return Ok(Poll::Pending),
+ Poll::Pending => return Poll::Pending,
};
- self.remaining -= 1;
*a += 1;
+ drop(a);
+ self.remaining -= 1;
}
}
}
diff --git a/tests/stream.rs b/tests/stream.rs
index 5cde458..6cbef75 100644
--- a/tests/stream.rs
+++ b/tests/stream.rs
@@ -14,6 +14,7 @@ use futures::stream::{self, StreamExt};
use futures::task::Poll;
use futures::{ready, FutureExt};
use futures_core::Stream;
+use futures_executor::ThreadPool;
use futures_test::task::noop_context;
#[test]
@@ -65,6 +66,7 @@ fn flatten_unordered() {
use futures::task::*;
use std::convert::identity;
use std::pin::Pin;
+ use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
@@ -322,6 +324,78 @@ fn flatten_unordered() {
assert_eq!(values, (0..60).collect::<Vec<u8>>());
});
}
+
+ fn timeout<I: Clone>(time: Duration, value: I) -> impl Future<Output = I> {
+ let ready = Arc::new(AtomicBool::new(false));
+ let mut spawned = false;
+
+ future::poll_fn(move |cx| {
+ if !spawned {
+ let waker = cx.waker().clone();
+ let ready = ready.clone();
+
+ std::thread::spawn(move || {
+ std::thread::sleep(time);
+ ready.store(true, Ordering::Release);
+
+ waker.wake_by_ref()
+ });
+ spawned = true;
+ }
+
+ if ready.load(Ordering::Acquire) {
+ Poll::Ready(value.clone())
+ } else {
+ Poll::Pending
+ }
+ })
+ }
+
+ fn build_nested_fu<S: Stream + Unpin>(st: S) -> impl Stream<Item = S::Item> + Unpin
+ where
+ S::Item: Clone,
+ {
+ let inner = st
+ .then(|item| timeout(Duration::from_millis(50), item))
+ .enumerate()
+ .map(|(idx, value)| {
+ stream::once(if idx % 2 == 0 {
+ future::ready(value).left_future()
+ } else {
+ timeout(Duration::from_millis(100), value).right_future()
+ })
+ })
+ .flatten_unordered(None);
+
+ stream::once(future::ready(inner)).flatten_unordered(None)
+ }
+
+ // nested `flatten_unordered`
+ let te = ThreadPool::new().unwrap();
+ let base_handle = te
+ .spawn_with_handle(async move {
+ let fu = build_nested_fu(stream::iter(1..=10));
+
+ assert_eq!(fu.count().await, 10);
+ })
+ .unwrap();
+
+ block_on(base_handle);
+
+ let empty_state_move_handle = te
+ .spawn_with_handle(async move {
+ let mut fu = build_nested_fu(stream::iter(1..10));
+ {
+ let mut cx = noop_context();
+ let _ = fu.poll_next_unpin(&mut cx);
+ let _ = fu.poll_next_unpin(&mut cx);
+ }
+
+ assert_eq!(fu.count().await, 9);
+ })
+ .unwrap();
+
+ block_on(empty_state_move_handle);
}
#[test]
@@ -461,3 +535,43 @@ fn select_with_strategy_doesnt_terminate_early() {
assert_eq!(count.get(), times_should_poll + 1);
}
}
+
+async fn is_even(number: u8) -> bool {
+ number % 2 == 0
+}
+
+#[test]
+fn all() {
+ block_on(async {
+ let empty: [u8; 0] = [];
+ let st = stream::iter(empty);
+ let all = st.all(is_even).await;
+ assert!(all);
+
+ let st = stream::iter([2, 4, 6, 8]);
+ let all = st.all(is_even).await;
+ assert!(all);
+
+ let st = stream::iter([2, 3, 4]);
+ let all = st.all(is_even).await;
+ assert!(!all);
+ });
+}
+
+#[test]
+fn any() {
+ block_on(async {
+ let empty: [u8; 0] = [];
+ let st = stream::iter(empty);
+ let any = st.any(is_even).await;
+ assert!(!any);
+
+ let st = stream::iter([1, 2, 3]);
+ let any = st.any(is_even).await;
+ assert!(any);
+
+ let st = stream::iter([1, 3, 5]);
+ let any = st.any(is_even).await;
+ assert!(!any);
+ });
+}
diff --git a/tests/stream_futures_unordered.rs b/tests/stream_futures_unordered.rs
index b568280..7bdf543 100644
--- a/tests/stream_futures_unordered.rs
+++ b/tests/stream_futures_unordered.rs
@@ -381,3 +381,28 @@ fn clear() {
tasks.clear();
assert!(!tasks.is_terminated());
}
+
+// https://github.com/rust-lang/futures-rs/issues/2529#issuecomment-997290279
+#[test]
+fn clear_in_loop() {
+ const N: usize =
+ if cfg!(miri) || option_env!("QEMU_LD_PREFIX").is_some() { 100 } else { 10_000 };
+ futures::executor::block_on(async {
+ async fn task() {
+ let (s, r) = oneshot::channel();
+ std::thread::spawn(|| {
+ std::thread::sleep(std::time::Duration::from_micros(100));
+ let _ = s.send(());
+ });
+ r.await.unwrap()
+ }
+ let mut futures = FuturesUnordered::new();
+ for _ in 0..N {
+ for _ in 0..24 {
+ futures.push(task());
+ }
+ let _ = futures.next().await;
+ futures.clear();
+ }
+ });
+}
diff --git a/tests/stream_try_stream.rs b/tests/stream_try_stream.rs
index 194e74d..ef38c51 100644
--- a/tests/stream_try_stream.rs
+++ b/tests/stream_try_stream.rs
@@ -1,7 +1,13 @@
+use core::pin::Pin;
+use std::convert::Infallible;
+
use futures::{
- stream::{self, StreamExt, TryStreamExt},
+ stream::{self, repeat, Repeat, StreamExt, TryStreamExt},
task::Poll,
+ Stream,
};
+use futures_executor::block_on;
+use futures_task::Context;
use futures_test::task::noop_context;
#[test]
@@ -36,3 +42,142 @@ fn try_take_while_after_err() {
.boxed();
assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
}
+
+#[test]
+fn try_flatten_unordered() {
+ let test_st = stream::iter(1..7)
+ .map(|val: u32| {
+ if val % 2 == 0 {
+ Ok(stream::unfold((val, 1), |(val, pow)| async move {
+ Some((val.pow(pow), (val, pow + 1)))
+ })
+ .take(3)
+ .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) }))
+ } else {
+ Err(val)
+ }
+ })
+ .map_ok(Box::pin)
+ .try_flatten_unordered(None);
+
+ block_on(async move {
+ assert_eq!(
+ // All numbers can be divided by 16 and odds must be `Err`
+ // For all basic evens we must have powers from 1 to 3
+ vec![
+ Err(1),
+ Err(3),
+ Err(5),
+ Ok(2),
+ Ok(4),
+ Ok(6),
+ Ok(4),
+ Err(16),
+ Ok(36),
+ Ok(8),
+ Err(64),
+ Ok(216)
+ ],
+ test_st.collect::<Vec<_>>().await
+ )
+ });
+
+ #[derive(Clone, Debug)]
+ struct ErrorStream {
+ error_after: usize,
+ polled: usize,
+ }
+
+ impl Stream for ErrorStream {
+ type Item = Result<Repeat<Result<(), ()>>, ()>;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
+ if self.polled > self.error_after {
+ panic!("Polled after error");
+ } else {
+ let out =
+ if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) };
+ self.polled += 1;
+ Poll::Ready(Some(out))
+ }
+ }
+ }
+
+ block_on(async move {
+ let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None);
+ let mut ctr = 0;
+ while (st.try_next().await).is_ok() {
+ ctr += 1;
+ }
+ assert_eq!(ctr, 0);
+
+ assert_eq!(
+ ErrorStream { error_after: 10, polled: 0 }
+ .try_flatten_unordered(None)
+ .inspect_ok(|_| panic!("Unexpected `Ok`"))
+ .try_collect::<Vec<_>>()
+ .await,
+ Err(())
+ );
+
+ let mut taken = 0;
+ assert_eq!(
+ ErrorStream { error_after: 10, polled: 0 }
+ .map_ok(|st| st.take(3))
+ .try_flatten_unordered(1)
+ .inspect(|_| taken += 1)
+ .try_fold((), |(), res| async move { Ok(res) })
+ .await,
+ Err(())
+ );
+ assert_eq!(taken, 31);
+ })
+}
+
+async fn is_even(number: u8) -> bool {
+ number % 2 == 0
+}
+
+#[test]
+fn try_all() {
+ block_on(async {
+ let empty: [Result<u8, Infallible>; 0] = [];
+ let st = stream::iter(empty);
+ let all = st.try_all(is_even).await;
+ assert_eq!(Ok(true), all);
+
+ let st = stream::iter([Ok::<_, Infallible>(2), Ok(4), Ok(6), Ok(8)]);
+ let all = st.try_all(is_even).await;
+ assert_eq!(Ok(true), all);
+
+ let st = stream::iter([Ok::<_, Infallible>(2), Ok(3), Ok(4)]);
+ let all = st.try_all(is_even).await;
+ assert_eq!(Ok(false), all);
+
+ let st = stream::iter([Ok(2), Ok(4), Err("err"), Ok(8)]);
+ let all = st.try_all(is_even).await;
+ assert_eq!(Err("err"), all);
+ });
+}
+
+#[test]
+fn try_any() {
+ block_on(async {
+ let empty: [Result<u8, Infallible>; 0] = [];
+ let st = stream::iter(empty);
+ let any = st.try_any(is_even).await;
+ assert_eq!(Ok(false), any);
+
+ let st = stream::iter([Ok::<_, Infallible>(1), Ok(2), Ok(3)]);
+ let any = st.try_any(is_even).await;
+ assert_eq!(Ok(true), any);
+
+ let st = stream::iter([Ok::<_, Infallible>(1), Ok(3), Ok(5)]);
+ let any = st.try_any(is_even).await;
+ assert_eq!(Ok(false), any);
+
+ let st = stream::iter([Ok(1), Ok(3), Err("err"), Ok(8)]);
+ let any = st.try_any(is_even).await;
+ assert_eq!(Err("err"), any);
+ });
+}