diff options
author | Jeff Vander Stoep <jeffv@google.com> | 2024-02-01 18:53:32 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2024-02-01 18:53:32 +0000 |
commit | 6d23b35d43e81ef39cfbf1ca8b8846219fc8777e (patch) | |
tree | 454146e494166dac5ec269dc5ab7056118aff48c | |
parent | 81e706865418a818dbf657178f0e2e3414e126bd (diff) | |
parent | 88054eb6cb5d7f23ec2c8bce8be19ea760b93e30 (diff) | |
download | futures-master.tar.gz |
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures/+/2943533
Change-Id: Ie74f9821b96ef3a421d47ebc2ce3ebc4bb092174
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 2 | ||||
-rw-r--r-- | Cargo.toml | 18 | ||||
-rw-r--r-- | Cargo.toml.orig | 18 | ||||
-rw-r--r-- | METADATA | 23 | ||||
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | tests/auto_traits.rs | 43 | ||||
-rw-r--r-- | tests/bilock.rs (renamed from tests_disabled/bilock.rs) | 68 | ||||
-rw-r--r-- | tests/stream.rs | 114 | ||||
-rw-r--r-- | tests/stream_futures_unordered.rs | 25 | ||||
-rw-r--r-- | tests/stream_try_stream.rs | 147 |
11 files changed, 376 insertions, 86 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index c5e5d7e..f60a8e9 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "5e3693a350f96244151081d2c030208cd15f9572" + "sha1": "de1a0fd64a1bcae9a1534ed4da1699632993cc26" }, "path_in_vcs": "futures" }
\ No newline at end of file @@ -42,7 +42,7 @@ rust_library { host_supported: true, crate_name: "futures", cargo_env_compat: true, - cargo_pkg_version: "0.3.26", + cargo_pkg_version: "0.3.30", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -11,9 +11,9 @@ [package] edition = "2018" -rust-version = "1.45" +rust-version = "1.56" name = "futures" -version = "0.3.26" +version = "0.3.30" description = """ An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces. @@ -47,33 +47,33 @@ features = [ ] [dependencies.futures-channel] -version = "0.3.26" +version = "0.3.30" features = ["sink"] default-features = false [dependencies.futures-core] -version = "0.3.26" +version = "0.3.30" default-features = false [dependencies.futures-executor] -version = "0.3.26" +version = "0.3.30" optional = true default-features = false [dependencies.futures-io] -version = "0.3.26" +version = "0.3.30" default-features = false [dependencies.futures-sink] -version = "0.3.26" +version = "0.3.30" default-features = false [dependencies.futures-task] -version = "0.3.26" +version = "0.3.30" default-features = false [dependencies.futures-util] -version = "0.3.26" +version = "0.3.30" features = ["sink"] default-features = false diff --git a/Cargo.toml.orig b/Cargo.toml.orig index e7a5f38..6208f61 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,8 +1,8 @@ [package] name = "futures" -version = "0.3.26" +version = "0.3.30" edition = "2018" -rust-version = "1.45" +rust-version = "1.56" license = "MIT OR Apache-2.0" readme = "../README.md" keywords = ["futures", "async", "future"] @@ -15,13 +15,13 @@ composability, and iterator-like interfaces. categories = ["asynchronous"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.26", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.26", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.26", default-features = false, features = ["sink"] } -futures-executor = { path = "../futures-executor", version = "0.3.26", default-features = false, optional = true } -futures-io = { path = "../futures-io", version = "0.3.26", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.26", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.26", default-features = false, features = ["sink"] } +futures-core = { path = "../futures-core", version = "0.3.30", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.30", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.30", default-features = false, features = ["sink"] } +futures-executor = { path = "../futures-executor", version = "0.3.30", default-features = false, optional = true } +futures-io = { path = "../futures-io", version = "0.3.30", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.30", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.30", default-features = false, features = ["sink"] } [dev-dependencies] futures-executor = { path = "../futures-executor", features = ["thread-pool"] } @@ -1,23 +1,20 @@ # This project was upgraded with external_updater. -# Usage: tools/external_updater/updater.sh update rust/crates/futures -# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md +# Usage: tools/external_updater/updater.sh update external/rust/crates/futures +# For more info, check https://cs.android.com/android/platform/superproject/+/main:tools/external_updater/README.md name: "futures" description: "An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces." third_party { - url { - type: HOMEPAGE - value: "https://crates.io/crates/futures" - } - url { - type: ARCHIVE - value: "https://static.crates.io/crates/futures/futures-0.3.26.crate" - } - version: "0.3.26" license_type: NOTICE last_upgrade_date { - year: 2023 + year: 2024 month: 2 - day: 15 + day: 1 + } + homepage: "https://crates.io/crates/futures" + identifier { + type: "Archive" + value: "https://static.crates.io/crates/futures/futures-0.3.30.crate" + version: "0.3.30" } } @@ -38,7 +38,7 @@ Add this to your `Cargo.toml`: futures = "0.3" ``` -The current `futures` requires Rust 1.45 or later. +The current `futures` requires Rust 1.56 or later. ### Feature `std` diff --git a/tests/auto_traits.rs b/tests/auto_traits.rs index 5fc0f7d..004fda1 100644 --- a/tests/auto_traits.rs +++ b/tests/auto_traits.rs @@ -18,6 +18,8 @@ pub type SendFuture<T = *const ()> = Pin<Box<dyn Future<Output = T> + Send>>; pub type SendTryFuture<T = *const (), E = *const ()> = SendFuture<Result<T, E>>; pub type SyncFuture<T = *const ()> = Pin<Box<dyn Future<Output = T> + Sync>>; pub type SyncTryFuture<T = *const (), E = *const ()> = SyncFuture<Result<T, E>>; +pub type SendSyncFuture<T = *const ()> = Pin<Box<dyn Future<Output = T> + Send + Sync>>; +pub type SendSyncTryFuture<T = *const (), E = *const ()> = SendSyncFuture<Result<T, E>>; pub type UnpinFuture<T = PhantomPinned> = LocalFuture<T>; pub type UnpinTryFuture<T = PhantomPinned, E = PhantomPinned> = UnpinFuture<Result<T, E>>; pub struct PinnedFuture<T = PhantomPinned>(PhantomPinned, PhantomData<T>); @@ -35,6 +37,8 @@ pub type SendStream<T = *const ()> = Pin<Box<dyn Stream<Item = T> + Send>>; pub type SendTryStream<T = *const (), E = *const ()> = SendStream<Result<T, E>>; pub type SyncStream<T = *const ()> = Pin<Box<dyn Stream<Item = T> + Sync>>; pub type SyncTryStream<T = *const (), E = *const ()> = SyncStream<Result<T, E>>; +pub type SendSyncStream<T = *const ()> = Pin<Box<dyn Stream<Item = T> + Send + Sync>>; +pub type SendSyncTryStream<T = *const (), E = *const ()> = SendSyncStream<Result<T, E>>; pub type UnpinStream<T = PhantomPinned> = LocalStream<T>; pub type UnpinTryStream<T = PhantomPinned, E = PhantomPinned> = UnpinStream<Result<T, E>>; pub struct PinnedStream<T = PhantomPinned>(PhantomPinned, PhantomData<T>); @@ -365,9 +369,10 @@ pub mod future { assert_impl!(JoinAll<SendFuture<()>>: Send); assert_not_impl!(JoinAll<LocalFuture>: Send); assert_not_impl!(JoinAll<SendFuture>: Send); - assert_impl!(JoinAll<SyncFuture<()>>: Sync); - assert_not_impl!(JoinAll<LocalFuture>: Sync); - assert_not_impl!(JoinAll<SyncFuture>: Sync); + assert_impl!(JoinAll<SendSyncFuture<()>>: Sync); + assert_not_impl!(JoinAll<SendFuture<()>>: Sync); + assert_not_impl!(JoinAll<SyncFuture<()>>: Sync); + assert_not_impl!(JoinAll<SendSyncFuture>: Sync); assert_impl!(JoinAll<PinnedFuture>: Unpin); assert_impl!(Lazy<()>: Send); @@ -579,9 +584,10 @@ pub mod future { assert_impl!(TryJoinAll<SendTryFuture<(), ()>>: Send); assert_not_impl!(TryJoinAll<LocalTryFuture>: Send); assert_not_impl!(TryJoinAll<SendTryFuture>: Send); - assert_impl!(TryJoinAll<SyncTryFuture<(), ()>>: Sync); - assert_not_impl!(TryJoinAll<LocalTryFuture>: Sync); - assert_not_impl!(TryJoinAll<SyncTryFuture>: Sync); + assert_impl!(TryJoinAll<SendSyncTryFuture<(), ()>>: Sync); + assert_not_impl!(TryJoinAll<SendTryFuture<(), ()>>: Sync); + assert_not_impl!(TryJoinAll<SyncTryFuture<(), ()>>: Sync); + assert_not_impl!(TryJoinAll<SendSyncTryFuture>: Sync); assert_impl!(TryJoinAll<PinnedTryFuture>: Unpin); assert_impl!(TrySelect<SendFuture, SendFuture>: Send); @@ -1118,10 +1124,9 @@ pub mod stream { assert_not_impl!(Buffered<SendStream<SendFuture>>: Send); assert_not_impl!(Buffered<SendStream<LocalFuture>>: Send); assert_not_impl!(Buffered<LocalStream<SendFuture<()>>>: Send); - assert_impl!(Buffered<SyncStream<SyncFuture<()>>>: Sync); - assert_not_impl!(Buffered<SyncStream<SyncFuture>>: Sync); - assert_not_impl!(Buffered<SyncStream<LocalFuture>>: Sync); - assert_not_impl!(Buffered<LocalStream<SyncFuture<()>>>: Sync); + assert_impl!(Buffered<SyncStream<SendSyncFuture<()>>>: Sync); + assert_not_impl!(Buffered<SyncStream<SyncFuture<()>>>: Sync); + assert_not_impl!(Buffered<LocalStream<SendSyncFuture<()>>>: Sync); assert_impl!(Buffered<UnpinStream<PinnedFuture>>: Unpin); assert_not_impl!(Buffered<PinnedStream<PinnedFuture>>: Unpin); @@ -1303,9 +1308,10 @@ pub mod stream { assert_impl!(FuturesOrdered<SendFuture<()>>: Send); assert_not_impl!(FuturesOrdered<SendFuture>: Send); assert_not_impl!(FuturesOrdered<SendFuture>: Send); - assert_impl!(FuturesOrdered<SyncFuture<()>>: Sync); - assert_not_impl!(FuturesOrdered<LocalFuture<()>>: Sync); - assert_not_impl!(FuturesOrdered<LocalFuture<()>>: Sync); + assert_impl!(FuturesOrdered<SendSyncFuture<()>>: Sync); + assert_not_impl!(FuturesOrdered<SyncFuture<()>>: Sync); + assert_not_impl!(FuturesOrdered<SendFuture<()>>: Sync); + assert_not_impl!(FuturesOrdered<SendSyncFuture>: Sync); assert_impl!(FuturesOrdered<PinnedFuture>: Unpin); assert_impl!(FuturesUnordered<()>: Send); @@ -1647,11 +1653,12 @@ pub mod stream { assert_not_impl!(TryBuffered<SendTryStream<SendTryFuture<(), *const ()>>>: Send); assert_not_impl!(TryBuffered<SendTryStream<LocalTryFuture<(), ()>>>: Send); assert_not_impl!(TryBuffered<LocalTryStream<SendTryFuture<(), ()>>>: Send); - assert_impl!(TryBuffered<SyncTryStream<SyncTryFuture<(), ()>>>: Sync); - assert_not_impl!(TryBuffered<SyncTryStream<SyncTryFuture<*const (), ()>>>: Sync); - assert_not_impl!(TryBuffered<SyncTryStream<SyncTryFuture<(), *const ()>>>: Sync); - assert_not_impl!(TryBuffered<SyncTryStream<LocalTryFuture<(), ()>>>: Sync); - assert_not_impl!(TryBuffered<LocalTryStream<SyncTryFuture<(), ()>>>: Sync); + assert_impl!(TryBuffered<SyncTryStream<SendSyncTryFuture<(), ()>>>: Sync); + assert_not_impl!(TryBuffered<SyncTryStream<SendSyncTryFuture<*const (), ()>>>: Sync); + assert_not_impl!(TryBuffered<SyncTryStream<SendSyncTryFuture<(), *const ()>>>: Sync); + assert_not_impl!(TryBuffered<SyncTryStream<SendTryFuture<(), ()>>>: Sync); + assert_not_impl!(TryBuffered<SyncTryStream<SyncTryFuture<(), ()>>>: Sync); + assert_not_impl!(TryBuffered<LocalTryStream<SendSyncTryFuture<(), ()>>>: Sync); assert_impl!(TryBuffered<UnpinTryStream<PinnedTryFuture>>: Unpin); assert_not_impl!(TryBuffered<PinnedTryStream<UnpinTryFuture>>: Unpin); diff --git a/tests_disabled/bilock.rs b/tests/bilock.rs index 0166ca4..b103487 100644 --- a/tests_disabled/bilock.rs +++ b/tests/bilock.rs @@ -1,34 +1,38 @@ +#![cfg(feature = "bilock")] + +use futures::executor::block_on; use futures::future; use futures::stream; -use futures::task; +use futures::task::{Context, Poll}; +use futures::Future; +use futures::StreamExt; +use futures_test::task::noop_context; use futures_util::lock::BiLock; +use std::pin::Pin; use std::thread; -// mod support; -// use support::*; - #[test] fn smoke() { - let future = future::lazy(|_| { + let future = future::lazy(|cx| { let (a, b) = BiLock::new(1); { - let mut lock = match a.poll_lock() { + let mut lock = match a.poll_lock(cx) { Poll::Ready(l) => l, Poll::Pending => panic!("poll not ready"), }; assert_eq!(*lock, 1); *lock = 2; - assert!(b.poll_lock().is_pending()); - assert!(a.poll_lock().is_pending()); + assert!(b.poll_lock(cx).is_pending()); + assert!(a.poll_lock(cx).is_pending()); } - assert!(b.poll_lock().is_ready()); - assert!(a.poll_lock().is_ready()); + assert!(b.poll_lock(cx).is_ready()); + assert!(a.poll_lock(cx).is_ready()); { - let lock = match b.poll_lock() { + let lock = match b.poll_lock(cx) { Poll::Ready(l) => l, Poll::Pending => panic!("poll not ready"), }; @@ -40,34 +44,32 @@ fn smoke() { Ok::<(), ()>(()) }); - assert!(task::spawn(future) - .poll_future_notify(¬ify_noop(), 0) - .expect("failure in poll") - .is_ready()); + assert_eq!(block_on(future), Ok(())); } #[test] fn concurrent() { const N: usize = 10000; + let mut cx = noop_context(); let (a, b) = BiLock::new(0); let a = Increment { a: Some(a), remaining: N }; - let b = stream::iter_ok(0..N).fold(b, |b, _n| { - b.lock().map(|mut b| { - *b += 1; - b.unlock() - }) + let b = stream::iter(0..N).fold(b, |b, _n| async { + let mut g = b.lock().await; + *g += 1; + drop(g); + b }); - let t1 = thread::spawn(move || a.wait()); - let b = b.wait().expect("b error"); - let a = t1.join().unwrap().expect("a error"); + let t1 = thread::spawn(move || block_on(a)); + let b = block_on(b); + let a = t1.join().unwrap(); - match a.poll_lock() { + match a.poll_lock(&mut cx) { Poll::Ready(l) => assert_eq!(*l, 2 * N), Poll::Pending => panic!("poll not ready"), } - match b.poll_lock() { + match b.poll_lock(&mut cx) { Poll::Ready(l) => assert_eq!(*l, 2 * N), Poll::Pending => panic!("poll not ready"), } @@ -80,22 +82,22 @@ fn concurrent() { } impl Future for Increment { - type Item = BiLock<usize>; - type Error = (); + type Output = BiLock<usize>; - fn poll(&mut self) -> Poll<BiLock<usize>, ()> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<BiLock<usize>> { loop { if self.remaining == 0 { - return Ok(self.a.take().unwrap().into()); + return self.a.take().unwrap().into(); } - let a = self.a.as_ref().unwrap(); - let mut a = match a.poll_lock() { + let a = self.a.as_mut().unwrap(); + let mut a = match a.poll_lock(cx) { Poll::Ready(l) => l, - Poll::Pending => return Ok(Poll::Pending), + Poll::Pending => return Poll::Pending, }; - self.remaining -= 1; *a += 1; + drop(a); + self.remaining -= 1; } } } diff --git a/tests/stream.rs b/tests/stream.rs index 5cde458..6cbef75 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -14,6 +14,7 @@ use futures::stream::{self, StreamExt}; use futures::task::Poll; use futures::{ready, FutureExt}; use futures_core::Stream; +use futures_executor::ThreadPool; use futures_test::task::noop_context; #[test] @@ -65,6 +66,7 @@ fn flatten_unordered() { use futures::task::*; use std::convert::identity; use std::pin::Pin; + use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::Duration; @@ -322,6 +324,78 @@ fn flatten_unordered() { assert_eq!(values, (0..60).collect::<Vec<u8>>()); }); } + + fn timeout<I: Clone>(time: Duration, value: I) -> impl Future<Output = I> { + let ready = Arc::new(AtomicBool::new(false)); + let mut spawned = false; + + future::poll_fn(move |cx| { + if !spawned { + let waker = cx.waker().clone(); + let ready = ready.clone(); + + std::thread::spawn(move || { + std::thread::sleep(time); + ready.store(true, Ordering::Release); + + waker.wake_by_ref() + }); + spawned = true; + } + + if ready.load(Ordering::Acquire) { + Poll::Ready(value.clone()) + } else { + Poll::Pending + } + }) + } + + fn build_nested_fu<S: Stream + Unpin>(st: S) -> impl Stream<Item = S::Item> + Unpin + where + S::Item: Clone, + { + let inner = st + .then(|item| timeout(Duration::from_millis(50), item)) + .enumerate() + .map(|(idx, value)| { + stream::once(if idx % 2 == 0 { + future::ready(value).left_future() + } else { + timeout(Duration::from_millis(100), value).right_future() + }) + }) + .flatten_unordered(None); + + stream::once(future::ready(inner)).flatten_unordered(None) + } + + // nested `flatten_unordered` + let te = ThreadPool::new().unwrap(); + let base_handle = te + .spawn_with_handle(async move { + let fu = build_nested_fu(stream::iter(1..=10)); + + assert_eq!(fu.count().await, 10); + }) + .unwrap(); + + block_on(base_handle); + + let empty_state_move_handle = te + .spawn_with_handle(async move { + let mut fu = build_nested_fu(stream::iter(1..10)); + { + let mut cx = noop_context(); + let _ = fu.poll_next_unpin(&mut cx); + let _ = fu.poll_next_unpin(&mut cx); + } + + assert_eq!(fu.count().await, 9); + }) + .unwrap(); + + block_on(empty_state_move_handle); } #[test] @@ -461,3 +535,43 @@ fn select_with_strategy_doesnt_terminate_early() { assert_eq!(count.get(), times_should_poll + 1); } } + +async fn is_even(number: u8) -> bool { + number % 2 == 0 +} + +#[test] +fn all() { + block_on(async { + let empty: [u8; 0] = []; + let st = stream::iter(empty); + let all = st.all(is_even).await; + assert!(all); + + let st = stream::iter([2, 4, 6, 8]); + let all = st.all(is_even).await; + assert!(all); + + let st = stream::iter([2, 3, 4]); + let all = st.all(is_even).await; + assert!(!all); + }); +} + +#[test] +fn any() { + block_on(async { + let empty: [u8; 0] = []; + let st = stream::iter(empty); + let any = st.any(is_even).await; + assert!(!any); + + let st = stream::iter([1, 2, 3]); + let any = st.any(is_even).await; + assert!(any); + + let st = stream::iter([1, 3, 5]); + let any = st.any(is_even).await; + assert!(!any); + }); +} diff --git a/tests/stream_futures_unordered.rs b/tests/stream_futures_unordered.rs index b568280..7bdf543 100644 --- a/tests/stream_futures_unordered.rs +++ b/tests/stream_futures_unordered.rs @@ -381,3 +381,28 @@ fn clear() { tasks.clear(); assert!(!tasks.is_terminated()); } + +// https://github.com/rust-lang/futures-rs/issues/2529#issuecomment-997290279 +#[test] +fn clear_in_loop() { + const N: usize = + if cfg!(miri) || option_env!("QEMU_LD_PREFIX").is_some() { 100 } else { 10_000 }; + futures::executor::block_on(async { + async fn task() { + let (s, r) = oneshot::channel(); + std::thread::spawn(|| { + std::thread::sleep(std::time::Duration::from_micros(100)); + let _ = s.send(()); + }); + r.await.unwrap() + } + let mut futures = FuturesUnordered::new(); + for _ in 0..N { + for _ in 0..24 { + futures.push(task()); + } + let _ = futures.next().await; + futures.clear(); + } + }); +} diff --git a/tests/stream_try_stream.rs b/tests/stream_try_stream.rs index 194e74d..ef38c51 100644 --- a/tests/stream_try_stream.rs +++ b/tests/stream_try_stream.rs @@ -1,7 +1,13 @@ +use core::pin::Pin; +use std::convert::Infallible; + use futures::{ - stream::{self, StreamExt, TryStreamExt}, + stream::{self, repeat, Repeat, StreamExt, TryStreamExt}, task::Poll, + Stream, }; +use futures_executor::block_on; +use futures_task::Context; use futures_test::task::noop_context; #[test] @@ -36,3 +42,142 @@ fn try_take_while_after_err() { .boxed(); assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); } + +#[test] +fn try_flatten_unordered() { + let test_st = stream::iter(1..7) + .map(|val: u32| { + if val % 2 == 0 { + Ok(stream::unfold((val, 1), |(val, pow)| async move { + Some((val.pow(pow), (val, pow + 1))) + }) + .take(3) + .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) })) + } else { + Err(val) + } + }) + .map_ok(Box::pin) + .try_flatten_unordered(None); + + block_on(async move { + assert_eq!( + // All numbers can be divided by 16 and odds must be `Err` + // For all basic evens we must have powers from 1 to 3 + vec![ + Err(1), + Err(3), + Err(5), + Ok(2), + Ok(4), + Ok(6), + Ok(4), + Err(16), + Ok(36), + Ok(8), + Err(64), + Ok(216) + ], + test_st.collect::<Vec<_>>().await + ) + }); + + #[derive(Clone, Debug)] + struct ErrorStream { + error_after: usize, + polled: usize, + } + + impl Stream for ErrorStream { + type Item = Result<Repeat<Result<(), ()>>, ()>; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> { + if self.polled > self.error_after { + panic!("Polled after error"); + } else { + let out = + if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) }; + self.polled += 1; + Poll::Ready(Some(out)) + } + } + } + + block_on(async move { + let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None); + let mut ctr = 0; + while (st.try_next().await).is_ok() { + ctr += 1; + } + assert_eq!(ctr, 0); + + assert_eq!( + ErrorStream { error_after: 10, polled: 0 } + .try_flatten_unordered(None) + .inspect_ok(|_| panic!("Unexpected `Ok`")) + .try_collect::<Vec<_>>() + .await, + Err(()) + ); + + let mut taken = 0; + assert_eq!( + ErrorStream { error_after: 10, polled: 0 } + .map_ok(|st| st.take(3)) + .try_flatten_unordered(1) + .inspect(|_| taken += 1) + .try_fold((), |(), res| async move { Ok(res) }) + .await, + Err(()) + ); + assert_eq!(taken, 31); + }) +} + +async fn is_even(number: u8) -> bool { + number % 2 == 0 +} + +#[test] +fn try_all() { + block_on(async { + let empty: [Result<u8, Infallible>; 0] = []; + let st = stream::iter(empty); + let all = st.try_all(is_even).await; + assert_eq!(Ok(true), all); + + let st = stream::iter([Ok::<_, Infallible>(2), Ok(4), Ok(6), Ok(8)]); + let all = st.try_all(is_even).await; + assert_eq!(Ok(true), all); + + let st = stream::iter([Ok::<_, Infallible>(2), Ok(3), Ok(4)]); + let all = st.try_all(is_even).await; + assert_eq!(Ok(false), all); + + let st = stream::iter([Ok(2), Ok(4), Err("err"), Ok(8)]); + let all = st.try_all(is_even).await; + assert_eq!(Err("err"), all); + }); +} + +#[test] +fn try_any() { + block_on(async { + let empty: [Result<u8, Infallible>; 0] = []; + let st = stream::iter(empty); + let any = st.try_any(is_even).await; + assert_eq!(Ok(false), any); + + let st = stream::iter([Ok::<_, Infallible>(1), Ok(2), Ok(3)]); + let any = st.try_any(is_even).await; + assert_eq!(Ok(true), any); + + let st = stream::iter([Ok::<_, Infallible>(1), Ok(3), Ok(5)]); + let any = st.try_any(is_even).await; + assert_eq!(Ok(false), any); + + let st = stream::iter([Ok(1), Ok(3), Err("err"), Ok(8)]); + let any = st.try_any(is_even).await; + assert_eq!(Err("err"), any); + }); +} |