diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:03:29 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:03:29 +0000 |
commit | ec2853e697ebfe66828610e926981841e4879803 (patch) | |
tree | 7b826874213015b98b9e0383a89c88be66b64b11 /src/future | |
parent | 5a2ead4f79e3b4c604edd450714bd1d925de1d4e (diff) | |
parent | e177aca7d9e335e8989e592627752459e00df418 (diff) | |
download | futures-util-b94cc39404d16f92b2d950731f1f4c7866b5f4e7.tar.gz |
Snap for 8564071 from e177aca7d9e335e8989e592627752459e00df418 to mainline-tethering-releaseaml_tet_331910040aml_tet_331820050aml_tet_331711040aml_tet_331511160aml_tet_331511000aml_tet_331412030aml_tet_331312080aml_tet_331117000aml_tet_331012080aml_tet_330911010aml_tet_330812150android13-mainline-tethering-release
Change-Id: I94036737fcc6a3b9c10172541508602e2eda7fb2
Diffstat (limited to 'src/future')
-rw-r--r-- | src/future/abortable.rs | 170 | ||||
-rw-r--r-- | src/future/either.rs | 31 | ||||
-rw-r--r-- | src/future/future/catch_unwind.rs | 10 | ||||
-rw-r--r-- | src/future/future/flatten.rs | 47 | ||||
-rw-r--r-- | src/future/future/fuse.rs | 4 | ||||
-rw-r--r-- | src/future/future/remote_handle.rs | 14 | ||||
-rw-r--r-- | src/future/future/shared.rs | 35 | ||||
-rw-r--r-- | src/future/join.rs | 11 | ||||
-rw-r--r-- | src/future/join_all.rs | 109 | ||||
-rw-r--r-- | src/future/lazy.rs | 15 | ||||
-rw-r--r-- | src/future/mod.rs | 24 | ||||
-rw-r--r-- | src/future/option.rs | 13 | ||||
-rw-r--r-- | src/future/pending.rs | 7 | ||||
-rw-r--r-- | src/future/poll_fn.rs | 5 | ||||
-rw-r--r-- | src/future/poll_immediate.rs | 126 | ||||
-rw-r--r-- | src/future/select.rs | 18 | ||||
-rw-r--r-- | src/future/select_all.rs | 26 | ||||
-rw-r--r-- | src/future/select_ok.rs | 29 | ||||
-rw-r--r-- | src/future/try_future/into_future.rs | 9 | ||||
-rw-r--r-- | src/future/try_future/try_flatten.rs | 77 | ||||
-rw-r--r-- | src/future/try_future/try_flatten_err.rs | 24 | ||||
-rw-r--r-- | src/future/try_join_all.rs | 31 | ||||
-rw-r--r-- | src/future/try_maybe_done.rs | 18 | ||||
-rw-r--r-- | src/future/try_select.rs | 25 |
24 files changed, 441 insertions, 437 deletions
diff --git a/src/future/abortable.rs b/src/future/abortable.rs index 3f2e5a0..d017ab7 100644 --- a/src/future/abortable.rs +++ b/src/future/abortable.rs @@ -1,110 +1,8 @@ use super::assert_future; -use crate::task::AtomicWaker; +use crate::future::{AbortHandle, Abortable, Aborted}; use futures_core::future::Future; -use futures_core::task::{Context, Poll}; -use core::fmt; -use core::pin::Pin; -use core::sync::atomic::{AtomicBool, Ordering}; -use alloc::sync::Arc; -use pin_project_lite::pin_project; -pin_project! { - /// A future which can be remotely short-circuited using an `AbortHandle`. - #[derive(Debug, Clone)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct Abortable<Fut> { - #[pin] - future: Fut, - inner: Arc<AbortInner>, - } -} - -impl<Fut> Abortable<Fut> where Fut: Future { - /// Creates a new `Abortable` future using an existing `AbortRegistration`. - /// `AbortRegistration`s can be acquired through `AbortHandle::new`. - /// - /// When `abort` is called on the handle tied to `reg` or if `abort` has - /// already been called, the future will complete immediately without making - /// any further progress. - /// - /// Example: - /// - /// ``` - /// # futures::executor::block_on(async { - /// use futures::future::{Abortable, AbortHandle, Aborted}; - /// - /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); - /// let future = Abortable::new(async { 2 }, abort_registration); - /// abort_handle.abort(); - /// assert_eq!(future.await, Err(Aborted)); - /// # }); - /// ``` - pub fn new(future: Fut, reg: AbortRegistration) -> Self { - assert_future::<Result<Fut::Output, Aborted>, _>(Self { - future, - inner: reg.inner, - }) - } -} - -/// A registration handle for a `Abortable` future. -/// Values of this type can be acquired from `AbortHandle::new` and are used -/// in calls to `Abortable::new`. -#[derive(Debug)] -pub struct AbortRegistration { - inner: Arc<AbortInner>, -} - -/// A handle to a `Abortable` future. -#[derive(Debug, Clone)] -pub struct AbortHandle { - inner: Arc<AbortInner>, -} - -impl AbortHandle { - /// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used - /// to abort a running future. - /// - /// This function is usually paired with a call to `Abortable::new`. - /// - /// Example: - /// - /// ``` - /// # futures::executor::block_on(async { - /// use futures::future::{Abortable, AbortHandle, Aborted}; - /// - /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); - /// let future = Abortable::new(async { 2 }, abort_registration); - /// abort_handle.abort(); - /// assert_eq!(future.await, Err(Aborted)); - /// # }); - /// ``` - pub fn new_pair() -> (Self, AbortRegistration) { - let inner = Arc::new(AbortInner { - waker: AtomicWaker::new(), - cancel: AtomicBool::new(false), - }); - - ( - Self { - inner: inner.clone(), - }, - AbortRegistration { - inner, - }, - ) - } -} - -// Inner type storing the waker to awaken and a bool indicating that it -// should be cancelled. -#[derive(Debug)] -struct AbortInner { - waker: AtomicWaker, - cancel: AtomicBool, -} - -/// Creates a new `Abortable` future and a `AbortHandle` which can be used to stop it. +/// Creates a new `Abortable` future and an `AbortHandle` which can be used to stop it. /// /// This function is a convenient (but less flexible) alternative to calling /// `AbortHandle::new` and `Abortable::new` manually. @@ -112,66 +10,10 @@ struct AbortInner { /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. pub fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle) - where Fut: Future +where + Fut: Future, { let (handle, reg) = AbortHandle::new_pair(); - ( - Abortable::new(future, reg), - handle, - ) -} - -/// Indicator that the `Abortable` future was aborted. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub struct Aborted; - -impl fmt::Display for Aborted { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "`Abortable` future has been aborted") - } -} - -#[cfg(feature = "std")] -impl std::error::Error for Aborted {} - -impl<Fut> Future for Abortable<Fut> where Fut: Future { - type Output = Result<Fut::Output, Aborted>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - // Check if the future has been aborted - if self.inner.cancel.load(Ordering::Relaxed) { - return Poll::Ready(Err(Aborted)) - } - - // attempt to complete the future - if let Poll::Ready(x) = self.as_mut().project().future.poll(cx) { - return Poll::Ready(Ok(x)) - } - - // Register to receive a wakeup if the future is aborted in the... future - self.inner.waker.register(cx.waker()); - - // Check to see if the future was aborted between the first check and - // registration. - // Checking with `Relaxed` is sufficient because `register` introduces an - // `AcqRel` barrier. - if self.inner.cancel.load(Ordering::Relaxed) { - return Poll::Ready(Err(Aborted)) - } - - Poll::Pending - } -} - -impl AbortHandle { - /// Abort the `Abortable` future associated with this handle. - /// - /// Notifies the Abortable future associated with this handle that it - /// should abort. Note that if the future is currently being polled on - /// another thread, it will not immediately stop running. Instead, it will - /// continue to run until its poll method returns. - pub fn abort(&self) { - self.inner.cancel.store(true, Ordering::Relaxed); - self.inner.waker.wake(); - } + let abortable = assert_future::<Result<Fut::Output, Aborted>, _>(Abortable::new(future, reg)); + (abortable, handle) } diff --git a/src/future/either.rs b/src/future/either.rs index 5f5b614..9602de7 100644 --- a/src/future/either.rs +++ b/src/future/either.rs @@ -5,8 +5,25 @@ use futures_core::stream::{FusedStream, Stream}; #[cfg(feature = "sink")] use futures_sink::Sink; -/// Combines two different futures, streams, or sinks having the same associated types into a single -/// type. +/// Combines two different futures, streams, or sinks having the same associated types into a single type. +/// +/// This is useful when conditionally choosing between two distinct future types: +/// +/// ```rust +/// use futures::future::Either; +/// +/// # futures::executor::block_on(async { +/// let cond = true; +/// +/// let fut = if cond { +/// Either::Left(async move { 12 }) +/// } else { +/// Either::Right(async move { 44 }) +/// }; +/// +/// assert_eq!(fut.await, 12); +/// # }) +/// ``` #[derive(Debug, Clone)] pub enum Either<A, B> { /// First branch of the type @@ -167,8 +184,6 @@ mod if_std { use core::pin::Pin; use core::task::{Context, Poll}; - #[cfg(feature = "read-initializer")] - use futures_io::Initializer; use futures_io::{ AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom, }; @@ -178,14 +193,6 @@ mod if_std { A: AsyncRead, B: AsyncRead, { - #[cfg(feature = "read-initializer")] - unsafe fn initializer(&self) -> Initializer { - match self { - Either::Left(x) => x.initializer(), - Either::Right(x) => x.initializer(), - } - } - fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/future/future/catch_unwind.rs b/src/future/future/catch_unwind.rs index 3f16577..0e09d6e 100644 --- a/src/future/future/catch_unwind.rs +++ b/src/future/future/catch_unwind.rs @@ -1,6 +1,6 @@ use core::any::Any; use core::pin::Pin; -use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; +use std::panic::{catch_unwind, AssertUnwindSafe, UnwindSafe}; use futures_core::future::Future; use futures_core::task::{Context, Poll}; @@ -16,14 +16,18 @@ pin_project! { } } -impl<Fut> CatchUnwind<Fut> where Fut: Future + UnwindSafe { +impl<Fut> CatchUnwind<Fut> +where + Fut: Future + UnwindSafe, +{ pub(super) fn new(future: Fut) -> Self { Self { future } } } impl<Fut> Future for CatchUnwind<Fut> - where Fut: Future + UnwindSafe, +where + Fut: Future + UnwindSafe, { type Output = Result<Fut::Output, Box<dyn Any + Send>>; diff --git a/src/future/future/flatten.rs b/src/future/future/flatten.rs index 0c48a4f..bd767af 100644 --- a/src/future/future/flatten.rs +++ b/src/future/future/flatten.rs @@ -2,9 +2,9 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; pin_project! { @@ -24,8 +24,9 @@ impl<Fut1, Fut2> Flatten<Fut1, Fut2> { } impl<Fut> FusedFuture for Flatten<Fut, Fut::Output> - where Fut: Future, - Fut::Output: Future, +where + Fut: Future, + Fut::Output: Future, { fn is_terminated(&self) -> bool { match self { @@ -36,8 +37,9 @@ impl<Fut> FusedFuture for Flatten<Fut, Fut::Output> } impl<Fut> Future for Flatten<Fut, Fut::Output> - where Fut: Future, - Fut::Output: Future, +where + Fut: Future, + Fut::Output: Future, { type Output = <Fut::Output as Future>::Output; @@ -47,12 +49,12 @@ impl<Fut> Future for Flatten<Fut, Fut::Output> FlattenProj::First { f } => { let f = ready!(f.poll(cx)); self.set(Self::Second { f }); - }, + } FlattenProj::Second { f } => { let output = ready!(f.poll(cx)); self.set(Self::Empty); break output; - }, + } FlattenProj::Empty => panic!("Flatten polled after completion"), } }) @@ -60,8 +62,9 @@ impl<Fut> Future for Flatten<Fut, Fut::Output> } impl<Fut> FusedStream for Flatten<Fut, Fut::Output> - where Fut: Future, - Fut::Output: Stream, +where + Fut: Future, + Fut::Output: Stream, { fn is_terminated(&self) -> bool { match self { @@ -72,32 +75,32 @@ impl<Fut> FusedStream for Flatten<Fut, Fut::Output> } impl<Fut> Stream for Flatten<Fut, Fut::Output> - where Fut: Future, - Fut::Output: Stream, +where + Fut: Future, + Fut::Output: Stream, { type Item = <Fut::Output as Stream>::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Ready(loop { match self.as_mut().project() { - FlattenProj::First { f } => { + FlattenProj::First { f } => { let f = ready!(f.poll(cx)); self.set(Self::Second { f }); - }, + } FlattenProj::Second { f } => { let output = ready!(f.poll_next(cx)); if output.is_none() { self.set(Self::Empty); } break output; - }, + } FlattenProj::Empty => break None, } }) } } - #[cfg(feature = "sink")] impl<Fut, Item> Sink<Item> for Flatten<Fut, Fut::Output> where @@ -106,19 +109,16 @@ where { type Error = <Fut::Output as Sink<Item>>::Error; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(loop { match self.as_mut().project() { FlattenProj::First { f } => { let f = ready!(f.poll(cx)); self.set(Self::Second { f }); - }, + } FlattenProj::Second { f } => { break ready!(f.poll_ready(cx)); - }, + } FlattenProj::Empty => panic!("poll_ready called after eof"), } }) @@ -140,10 +140,7 @@ where } } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { let res = match self.as_mut().project() { FlattenProj::Second { f } => f.poll_close(cx), _ => Poll::Ready(Ok(())), diff --git a/src/future/future/fuse.rs b/src/future/future/fuse.rs index f4284ba..597aec1 100644 --- a/src/future/future/fuse.rs +++ b/src/future/future/fuse.rs @@ -1,5 +1,5 @@ use core::pin::Pin; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::ready; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -86,7 +86,7 @@ impl<Fut: Future> Future for Fuse<Fut> { let output = ready!(fut.poll(cx)); self.project().inner.set(None); output - }, + } None => return Poll::Pending, }) } diff --git a/src/future/future/remote_handle.rs b/src/future/future/remote_handle.rs index 0d33ea5..1358902 100644 --- a/src/future/future/remote_handle.rs +++ b/src/future/future/remote_handle.rs @@ -1,23 +1,23 @@ use { crate::future::{CatchUnwind, FutureExt}, - futures_channel::oneshot::{self, Sender, Receiver}, + futures_channel::oneshot::{self, Receiver, Sender}, futures_core::{ future::Future, - task::{Context, Poll}, ready, + task::{Context, Poll}, }, + pin_project_lite::pin_project, std::{ any::Any, fmt, panic::{self, AssertUnwindSafe}, pin::Pin, sync::{ - Arc, atomic::{AtomicBool, Ordering}, + Arc, }, thread, }, - pin_project_lite::pin_project, }; /// The handle to a remote future returned by @@ -36,7 +36,7 @@ use { /// must be careful with regard to unwind safety because the thread in which the future /// is polled will keep running after the panic and the thread running the [RemoteHandle] /// will unwind. -#[must_use = "futures do nothing unless you `.await` or poll them"] +#[must_use = "dropping a remote handle cancels the underlying future"] #[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] pub struct RemoteHandle<T> { @@ -85,9 +85,7 @@ pin_project! { impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("Remote") - .field(&self.future) - .finish() + f.debug_tuple("Remote").field(&self.future).finish() } } diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs index 74311a0..9b31932 100644 --- a/src/future/future/shared.rs +++ b/src/future/future/shared.rs @@ -29,6 +29,12 @@ struct Notifier { /// A weak reference to a [`Shared`] that can be upgraded much like an `Arc`. pub struct WeakShared<Fut: Future>(Weak<Inner<Fut>>); +impl<Fut: Future> Clone for WeakShared<Fut> { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + // The future itself is polled behind the `Arc`, so it won't be moved // when `Shared` is moved. impl<Fut: Future> Unpin for Shared<Fut> {} @@ -90,10 +96,7 @@ impl<Fut: Future> Shared<Fut> { }), }; - Self { - inner: Some(Arc::new(inner)), - waker_key: NULL_WAKER_KEY, - } + Self { inner: Some(Arc::new(inner)), waker_key: NULL_WAKER_KEY } } } @@ -223,10 +226,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = &mut *self; - let inner = this - .inner - .take() - .expect("Shared future polled again after completion"); + let inner = this.inner.take().expect("Shared future polled again after completion"); // Fast path for when the wrapped future has already completed if inner.notifier.state.load(Acquire) == COMPLETE { @@ -286,11 +286,7 @@ where match future.poll(&mut cx) { Poll::Pending => { - if inner - .notifier - .state - .compare_exchange(POLLING, IDLE, SeqCst, SeqCst) - .is_ok() + if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok() { // Success drop(_reset); @@ -330,10 +326,7 @@ where Fut: Future, { fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - waker_key: NULL_WAKER_KEY, - } + Self { inner: self.inner.clone(), waker_key: NULL_WAKER_KEY } } } @@ -367,16 +360,12 @@ impl ArcWake for Notifier { } } -impl<Fut: Future> WeakShared<Fut> -{ +impl<Fut: Future> WeakShared<Fut> { /// Attempts to upgrade this [`WeakShared`] into a [`Shared`]. /// /// Returns [`None`] if all clones of the [`Shared`] have been dropped or polled /// to completion. pub fn upgrade(&self) -> Option<Shared<Fut>> { - Some(Shared { - inner: Some(self.0.upgrade()?), - waker_key: NULL_WAKER_KEY, - }) + Some(Shared { inner: Some(self.0.upgrade()?), waker_key: NULL_WAKER_KEY }) } } diff --git a/src/future/join.rs b/src/future/join.rs index a818343..740ffbc 100644 --- a/src/future/join.rs +++ b/src/future/join.rs @@ -213,14 +213,5 @@ where Fut5: Future, { let f = Join5::new(future1, future2, future3, future4, future5); - assert_future::< - ( - Fut1::Output, - Fut2::Output, - Fut3::Output, - Fut4::Output, - Fut5::Output, - ), - _, - >(f) + assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output, Fut4::Output, Fut5::Output), _>(f) } diff --git a/src/future/join_all.rs b/src/future/join_all.rs index 7ccf869..2e52ac1 100644 --- a/src/future/join_all.rs +++ b/src/future/join_all.rs @@ -1,33 +1,50 @@ //! Definition of the `JoinAll` combinator, waiting for all of a list of futures //! to finish. +use alloc::boxed::Box; +use alloc::vec::Vec; use core::fmt; use core::future::Future; use core::iter::FromIterator; use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use alloc::boxed::Box; -use alloc::vec::Vec; -use super::{MaybeDone, assert_future}; +use super::{assert_future, MaybeDone}; + +#[cfg(not(futures_no_atomic_cas))] +use crate::stream::{Collect, FuturesOrdered, StreamExt}; fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. - unsafe { slice.get_unchecked_mut() } - .iter_mut() - .map(|t| unsafe { Pin::new_unchecked(t) }) + unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) } -/// Future for the [`join_all`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] +/// Future for the [`join_all`] function. pub struct JoinAll<F> where F: Future, { - elems: Pin<Box<[MaybeDone<F>]>>, + kind: JoinAllKind<F>, +} + +#[cfg(not(futures_no_atomic_cas))] +const SMALL: usize = 30; + +pub(crate) enum JoinAllKind<F> +where + F: Future, +{ + Small { + elems: Pin<Box<[MaybeDone<F>]>>, + }, + #[cfg(not(futures_no_atomic_cas))] + Big { + fut: Collect<FuturesOrdered<F>, Vec<F::Output>>, + }, } impl<F> fmt::Debug for JoinAll<F> @@ -36,9 +53,13 @@ where F::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("JoinAll") - .field("elems", &self.elems) - .finish() + match self.kind { + JoinAllKind::Small { ref elems } => { + f.debug_struct("JoinAll").field("elems", elems).finish() + } + #[cfg(not(futures_no_atomic_cas))] + JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), + } } } @@ -54,10 +75,9 @@ where /// /// # See Also /// -/// This is purposefully a very simple API for basic use-cases. In a lot of -/// cases you will want to use the more powerful -/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does -/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered]. +/// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance +/// reasons if the number of futures is large. You may want to look into using it or +/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. /// /// Some examples for additional functionality provided by these are: /// @@ -79,13 +99,33 @@ where /// assert_eq!(join_all(futures).await, [1, 2, 3]); /// # }); /// ``` -pub fn join_all<I>(i: I) -> JoinAll<I::Item> +pub fn join_all<I>(iter: I) -> JoinAll<I::Item> where I: IntoIterator, I::Item: Future, { - let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect(); - assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { elems: elems.into() }) + #[cfg(futures_no_atomic_cas)] + { + let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into(); + let kind = JoinAllKind::Small { elems }; + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) + } + #[cfg(not(futures_no_atomic_cas))] + { + let iter = iter.into_iter(); + let kind = match iter.size_hint().1 { + None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }, + Some(max) => { + if max <= SMALL { + let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(); + JoinAllKind::Small { elems } + } else { + JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() } + } + } + }; + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) + } } impl<F> Future for JoinAll<F> @@ -95,22 +135,27 @@ where type Output = Vec<F::Output>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut all_done = true; + match &mut self.kind { + JoinAllKind::Small { elems } => { + let mut all_done = true; - for elem in iter_pin_mut(self.elems.as_mut()) { - if elem.poll(cx).is_pending() { - all_done = false; - } - } + for elem in iter_pin_mut(elems.as_mut()) { + if elem.poll(cx).is_pending() { + all_done = false; + } + } - if all_done { - let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let result = iter_pin_mut(elems.as_mut()) - .map(|e| e.take_output().unwrap()) - .collect(); - Poll::Ready(result) - } else { - Poll::Pending + if all_done { + let mut elems = mem::replace(elems, Box::pin([])); + let result = + iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); + Poll::Ready(result) + } else { + Poll::Pending + } + } + #[cfg(not(futures_no_atomic_cas))] + JoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } } diff --git a/src/future/lazy.rs b/src/future/lazy.rs index 42812d3..e9a8cf2 100644 --- a/src/future/lazy.rs +++ b/src/future/lazy.rs @@ -7,7 +7,7 @@ use futures_core::task::{Context, Poll}; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Lazy<F> { - f: Option<F> + f: Option<F>, } // safe because we never generate `Pin<&mut F>` @@ -33,19 +33,24 @@ impl<F> Unpin for Lazy<F> {} /// # }); /// ``` pub fn lazy<F, R>(f: F) -> Lazy<F> - where F: FnOnce(&mut Context<'_>) -> R, +where + F: FnOnce(&mut Context<'_>) -> R, { assert_future::<R, _>(Lazy { f: Some(f) }) } impl<F, R> FusedFuture for Lazy<F> - where F: FnOnce(&mut Context<'_>) -> R, +where + F: FnOnce(&mut Context<'_>) -> R, { - fn is_terminated(&self) -> bool { self.f.is_none() } + fn is_terminated(&self) -> bool { + self.f.is_none() + } } impl<F, R> Future for Lazy<F> - where F: FnOnce(&mut Context<'_>) -> R, +where + F: FnOnce(&mut Context<'_>) -> R, { type Output = R; diff --git a/src/future/mod.rs b/src/future/mod.rs index 84e457c..374e365 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -21,7 +21,7 @@ pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj}; #[allow(clippy::module_inception)] mod future; pub use self::future::{ - Flatten, Fuse, FutureExt, Inspect, IntoStream, Map, NeverError, Then, UnitError, MapInto, + Flatten, Fuse, FutureExt, Inspect, IntoStream, Map, MapInto, NeverError, Then, UnitError, }; #[deprecated(note = "This is now an alias for [Flatten](Flatten)")] @@ -40,8 +40,8 @@ pub use self::future::{Shared, WeakShared}; mod try_future; pub use self::try_future::{ - AndThen, ErrInto, OkInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, OrElse, TryFlattenStream, - TryFutureExt, UnwrapOrElse, MapOkOrElse, TryFlatten, + AndThen, ErrInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, MapOkOrElse, OkInto, + OrElse, TryFlatten, TryFlattenStream, TryFutureExt, UnwrapOrElse, }; #[cfg(feature = "sink")] @@ -68,6 +68,9 @@ pub use self::option::OptionFuture; mod poll_fn; pub use self::poll_fn::{poll_fn, PollFn}; +mod poll_immediate; +pub use self::poll_immediate::{poll_immediate, PollImmediate}; + mod ready; pub use self::ready::{err, ok, ready, Ready}; @@ -108,12 +111,15 @@ pub use self::select_ok::{select_ok, SelectOk}; mod either; pub use self::either::Either; -cfg_target_has_atomic! { - #[cfg(feature = "alloc")] - mod abortable; - #[cfg(feature = "alloc")] - pub use self::abortable::{abortable, Abortable, AbortHandle, AbortRegistration, Aborted}; -} +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod abortable; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted}; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +pub use abortable::abortable; // Just a helper function to ensure the futures we're returning all have the // right implementations. diff --git a/src/future/option.rs b/src/future/option.rs index 85939d6..0bc3777 100644 --- a/src/future/option.rs +++ b/src/future/option.rs @@ -1,7 +1,7 @@ //! Definition of the `Option` (optional step) combinator use core::pin::Pin; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -31,13 +31,16 @@ pin_project! { } } +impl<F> Default for OptionFuture<F> { + fn default() -> Self { + Self { inner: None } + } +} + impl<F: Future> Future for OptionFuture<F> { type Output = Option<F::Output>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { match self.project().inner.as_pin_mut() { Some(x) => x.poll(cx).map(Some), None => Poll::Ready(None), diff --git a/src/future/pending.rs b/src/future/pending.rs index 4311b9a..92c78d5 100644 --- a/src/future/pending.rs +++ b/src/future/pending.rs @@ -34,9 +34,7 @@ impl<T> FusedFuture for Pending<T> { /// # }); /// ``` pub fn pending<T>() -> Pending<T> { - assert_future::<T, _>(Pending { - _data: marker::PhantomData, - }) + assert_future::<T, _>(Pending { _data: marker::PhantomData }) } impl<T> Future for Pending<T> { @@ -47,8 +45,7 @@ impl<T> Future for Pending<T> { } } -impl<T> Unpin for Pending<T> { -} +impl<T> Unpin for Pending<T> {} impl<T> Clone for Pending<T> { fn clone(&self) -> Self { diff --git a/src/future/poll_fn.rs b/src/future/poll_fn.rs index 6ac1ab8..1931157 100644 --- a/src/future/poll_fn.rs +++ b/src/future/poll_fn.rs @@ -35,7 +35,7 @@ impl<F> Unpin for PollFn<F> {} /// ``` pub fn poll_fn<T, F>(f: F) -> PollFn<F> where - F: FnMut(&mut Context<'_>) -> Poll<T> + F: FnMut(&mut Context<'_>) -> Poll<T>, { assert_future::<T, _>(PollFn { f }) } @@ -47,7 +47,8 @@ impl<F> fmt::Debug for PollFn<F> { } impl<T, F> Future for PollFn<F> - where F: FnMut(&mut Context<'_>) -> Poll<T>, +where + F: FnMut(&mut Context<'_>) -> Poll<T>, { type Output = T; diff --git a/src/future/poll_immediate.rs b/src/future/poll_immediate.rs new file mode 100644 index 0000000..5ae555c --- /dev/null +++ b/src/future/poll_immediate.rs @@ -0,0 +1,126 @@ +use super::assert_future; +use core::pin::Pin; +use futures_core::task::{Context, Poll}; +use futures_core::{FusedFuture, Future, Stream}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`poll_immediate`](poll_immediate()) function. + /// + /// It will never return [Poll::Pending](core::task::Poll::Pending) + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PollImmediate<T> { + #[pin] + future: Option<T> + } +} + +impl<T, F> Future for PollImmediate<F> +where + F: Future<Output = T>, +{ + type Output = Option<T>; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + let mut this = self.project(); + let inner = + this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion"); + match inner.poll(cx) { + Poll::Ready(t) => { + this.future.set(None); + Poll::Ready(Some(t)) + } + Poll::Pending => Poll::Ready(None), + } + } +} + +impl<T: Future> FusedFuture for PollImmediate<T> { + fn is_terminated(&self) -> bool { + self.future.is_none() + } +} + +/// A [Stream](crate::stream::Stream) implementation that can be polled repeatedly until the future is done. +/// The stream will never return [Poll::Pending](core::task::Poll::Pending) +/// so polling it in a tight loop is worse than using a blocking synchronous function. +/// ``` +/// # futures::executor::block_on(async { +/// use futures::task::Poll; +/// use futures::{StreamExt, future, pin_mut}; +/// use future::FusedFuture; +/// +/// let f = async { 1_u32 }; +/// pin_mut!(f); +/// let mut r = future::poll_immediate(f); +/// assert_eq!(r.next().await, Some(Poll::Ready(1))); +/// +/// let f = async {futures::pending!(); 42_u8}; +/// pin_mut!(f); +/// let mut p = future::poll_immediate(f); +/// assert_eq!(p.next().await, Some(Poll::Pending)); +/// assert!(!p.is_terminated()); +/// assert_eq!(p.next().await, Some(Poll::Ready(42))); +/// assert!(p.is_terminated()); +/// assert_eq!(p.next().await, None); +/// # }); +/// ``` +impl<T, F> Stream for PollImmediate<F> +where + F: Future<Output = T>, +{ + type Item = Poll<T>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + match this.future.as_mut().as_pin_mut() { + // inner is gone, so we can signal that the stream is closed. + None => Poll::Ready(None), + Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| { + this.future.set(None); + t + }))), + } + } +} + +/// Creates a future that is immediately ready with an Option of a value. +/// Specifically this means that [poll](core::future::Future::poll()) always returns [Poll::Ready](core::task::Poll::Ready). +/// +/// # Caution +/// +/// When consuming the future by this function, note the following: +/// +/// - This function does not guarantee that the future will run to completion, so it is generally incompatible with passing the non-cancellation-safe future by value. +/// - Even if the future is cancellation-safe, creating and dropping new futures frequently may lead to performance problems. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let r = future::poll_immediate(async { 1_u32 }); +/// assert_eq!(r.await, Some(1)); +/// +/// let p = future::poll_immediate(future::pending::<i32>()); +/// assert_eq!(p.await, None); +/// # }); +/// ``` +/// +/// ### Reusing a future +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::{future, pin_mut}; +/// let f = async {futures::pending!(); 42_u8}; +/// pin_mut!(f); +/// assert_eq!(None, future::poll_immediate(&mut f).await); +/// assert_eq!(42, f.await); +/// # }); +/// ``` +pub fn poll_immediate<F: Future>(f: F) -> PollImmediate<F> { + assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate { future: Some(f) }) +} diff --git a/src/future/select.rs b/src/future/select.rs index 043ed17..bd44f20 100644 --- a/src/future/select.rs +++ b/src/future/select.rs @@ -1,8 +1,8 @@ use super::assert_future; +use crate::future::{Either, FutureExt}; use core::pin::Pin; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; -use crate::future::{Either, FutureExt}; /// Future for the [`select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] @@ -37,13 +37,13 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {} /// future::Either, /// future::self, /// }; -/// +/// /// // These two futures have different types even though their outputs have the same type. /// let future1 = async { /// future::pending::<()>().await; // will never finish /// 1 /// }; -/// let future2 = async { +/// let future2 = async { /// future::ready(2).await /// }; /// @@ -82,9 +82,13 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {} /// } /// ``` pub fn select<A, B>(future1: A, future2: B) -> Select<A, B> - where A: Future + Unpin, B: Future + Unpin +where + A: Future + Unpin, + B: Future + Unpin, { - assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select { inner: Some((future1, future2)) }) + assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select { + inner: Some((future1, future2)), + }) } impl<A, B> Future for Select<A, B> @@ -104,7 +108,7 @@ where self.inner = Some((a, b)); Poll::Pending } - } + }, } } } diff --git a/src/future/select_all.rs b/src/future/select_all.rs index 0db90a7..106e508 100644 --- a/src/future/select_all.rs +++ b/src/future/select_all.rs @@ -1,9 +1,9 @@ use super::assert_future; use crate::future::FutureExt; +use alloc::vec::Vec; use core::iter::FromIterator; use core::mem; use core::pin::Pin; -use alloc::vec::Vec; use futures_core::future::Future; use futures_core::task::{Context, Poll}; @@ -32,25 +32,29 @@ impl<Fut: Unpin> Unpin for SelectAll<Fut> {} /// /// This function will panic if the iterator specified contains no items. pub fn select_all<I>(iter: I) -> SelectAll<I::Item> - where I: IntoIterator, - I::Item: Future + Unpin, +where + I: IntoIterator, + I::Item: Future + Unpin, { - let ret = SelectAll { - inner: iter.into_iter().collect() - }; + let ret = SelectAll { inner: iter.into_iter().collect() }; assert!(!ret.inner.is_empty()); assert_future::<(<I::Item as Future>::Output, usize, Vec<I::Item>), _>(ret) } +impl<Fut> SelectAll<Fut> { + /// Consumes this combinator, returning the underlying futures. + pub fn into_inner(self) -> Vec<Fut> { + self.inner + } +} + impl<Fut: Future + Unpin> Future for SelectAll<Fut> { type Output = (Fut::Output, usize, Vec<Fut>); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| { - match f.poll_unpin(cx) { - Poll::Pending => None, - Poll::Ready(e) => Some((i, e)), - } + let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.poll_unpin(cx) { + Poll::Pending => None, + Poll::Ready(e) => Some((i, e)), }); match item { Some((idx, res)) => { diff --git a/src/future/select_ok.rs b/src/future/select_ok.rs index 52d393c..0ad83c6 100644 --- a/src/future/select_ok.rs +++ b/src/future/select_ok.rs @@ -1,9 +1,9 @@ use super::assert_future; use crate::future::TryFutureExt; +use alloc::vec::Vec; use core::iter::FromIterator; use core::mem; use core::pin::Pin; -use alloc::vec::Vec; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; @@ -30,14 +30,16 @@ impl<Fut: Unpin> Unpin for SelectOk<Fut> {} /// /// This function will panic if the iterator specified contains no items. pub fn select_ok<I>(iter: I) -> SelectOk<I::Item> - where I: IntoIterator, - I::Item: TryFuture + Unpin, +where + I: IntoIterator, + I::Item: TryFuture + Unpin, { - let ret = SelectOk { - inner: iter.into_iter().collect() - }; + let ret = SelectOk { inner: iter.into_iter().collect() }; assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); - assert_future::<Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>, _>(ret) + assert_future::< + Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>, + _, + >(ret) } impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { @@ -46,12 +48,11 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // loop until we've either exhausted all errors, a success was hit, or nothing is ready loop { - let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| { - match f.try_poll_unpin(cx) { + let item = + self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) { Poll::Pending => None, Poll::Ready(e) => Some((i, e)), - } - }); + }); match item { Some((idx, res)) => { // always remove Ok or Err, if it's not the last Err continue looping @@ -59,18 +60,18 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { match res { Ok(e) => { let rest = mem::replace(&mut self.inner, Vec::new()); - return Poll::Ready(Ok((e, rest))) + return Poll::Ready(Ok((e, rest))); } Err(e) => { if self.inner.is_empty() { - return Poll::Ready(Err(e)) + return Poll::Ready(Err(e)); } } } } None => { // based on the filter above, nothing is ready, return - return Poll::Pending + return Poll::Pending; } } } diff --git a/src/future/try_future/into_future.rs b/src/future/try_future/into_future.rs index e88d603..9f093d0 100644 --- a/src/future/try_future/into_future.rs +++ b/src/future/try_future/into_future.rs @@ -21,17 +21,16 @@ impl<Fut> IntoFuture<Fut> { } impl<Fut: TryFuture + FusedFuture> FusedFuture for IntoFuture<Fut> { - fn is_terminated(&self) -> bool { self.future.is_terminated() } + fn is_terminated(&self) -> bool { + self.future.is_terminated() + } } impl<Fut: TryFuture> Future for IntoFuture<Fut> { type Output = Result<Fut::Ok, Fut::Error>; #[inline] - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.project().future.try_poll(cx) } } diff --git a/src/future/try_future/try_flatten.rs b/src/future/try_future/try_flatten.rs index 5241b27..1ce4559 100644 --- a/src/future/try_future/try_flatten.rs +++ b/src/future/try_future/try_flatten.rs @@ -2,9 +2,9 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::ready; use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; pin_project! { @@ -24,8 +24,9 @@ impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> { } impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok> - where Fut: TryFuture, - Fut::Ok: TryFuture<Error=Fut::Error>, +where + Fut: TryFuture, + Fut::Ok: TryFuture<Error = Fut::Error>, { fn is_terminated(&self) -> bool { match self { @@ -36,28 +37,27 @@ impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok> } impl<Fut> Future for TryFlatten<Fut, Fut::Ok> - where Fut: TryFuture, - Fut::Ok: TryFuture<Error=Fut::Error>, +where + Fut: TryFuture, + Fut::Ok: TryFuture<Error = Fut::Error>, { type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First { f } => { - match ready!(f.try_poll(cx)) { - Ok(f) => self.set(Self::Second { f }), - Err(e) => { - self.set(Self::Empty); - break Err(e); - } + TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + Ok(f) => self.set(Self::Second { f }), + Err(e) => { + self.set(Self::Empty); + break Err(e); } }, TryFlattenProj::Second { f } => { let output = ready!(f.try_poll(cx)); self.set(Self::Empty); break output; - }, + } TryFlattenProj::Empty => panic!("TryFlatten polled after completion"), } }) @@ -65,8 +65,9 @@ impl<Fut> Future for TryFlatten<Fut, Fut::Ok> } impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok> - where Fut: TryFuture, - Fut::Ok: TryStream<Error=Fut::Error>, +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error>, { fn is_terminated(&self) -> bool { match self { @@ -77,21 +78,20 @@ impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok> } impl<Fut> Stream for TryFlatten<Fut, Fut::Ok> - where Fut: TryFuture, - Fut::Ok: TryStream<Error=Fut::Error>, +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error>, { type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First { f } => { - match ready!(f.try_poll(cx)) { - Ok(f) => self.set(Self::Second { f }), - Err(e) => { - self.set(Self::Empty); - break Some(Err(e)); - } + TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + Ok(f) => self.set(Self::Second { f }), + Err(e) => { + self.set(Self::Empty); + break Some(Err(e)); } }, TryFlattenProj::Second { f } => { @@ -100,40 +100,34 @@ impl<Fut> Stream for TryFlatten<Fut, Fut::Ok> self.set(Self::Empty); } break output; - }, + } TryFlattenProj::Empty => break None, } }) } } - #[cfg(feature = "sink")] impl<Fut, Item> Sink<Item> for TryFlatten<Fut, Fut::Ok> where Fut: TryFuture, - Fut::Ok: Sink<Item, Error=Fut::Error>, + Fut::Ok: Sink<Item, Error = Fut::Error>, { type Error = Fut::Error; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First { f } => { - match ready!(f.try_poll(cx)) { - Ok(f) => self.set(Self::Second { f }), - Err(e) => { - self.set(Self::Empty); - break Err(e); - } + TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + Ok(f) => self.set(Self::Second { f }), + Err(e) => { + self.set(Self::Empty); + break Err(e); } }, TryFlattenProj::Second { f } => { break ready!(f.poll_ready(cx)); - }, + } TryFlattenProj::Empty => panic!("poll_ready called after eof"), } }) @@ -155,10 +149,7 @@ where } } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { let res = match self.as_mut().project() { TryFlattenProj::Second { f } => f.poll_close(cx), _ => Poll::Ready(Ok(())), diff --git a/src/future/try_future/try_flatten_err.rs b/src/future/try_future/try_flatten_err.rs index 2e67f11..39b7d9f 100644 --- a/src/future/try_future/try_flatten_err.rs +++ b/src/future/try_future/try_flatten_err.rs @@ -21,8 +21,9 @@ impl<Fut1, Fut2> TryFlattenErr<Fut1, Fut2> { } impl<Fut> FusedFuture for TryFlattenErr<Fut, Fut::Error> - where Fut: TryFuture, - Fut::Error: TryFuture<Ok=Fut::Ok>, +where + Fut: TryFuture, + Fut::Error: TryFuture<Ok = Fut::Ok>, { fn is_terminated(&self) -> bool { match self { @@ -33,28 +34,27 @@ impl<Fut> FusedFuture for TryFlattenErr<Fut, Fut::Error> } impl<Fut> Future for TryFlattenErr<Fut, Fut::Error> - where Fut: TryFuture, - Fut::Error: TryFuture<Ok=Fut::Ok>, +where + Fut: TryFuture, + Fut::Error: TryFuture<Ok = Fut::Ok>, { type Output = Result<Fut::Ok, <Fut::Error as TryFuture>::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenErrProj::First { f } => { - match ready!(f.try_poll(cx)) { - Err(f) => self.set(Self::Second { f }), - Ok(e) => { - self.set(Self::Empty); - break Ok(e); - } + TryFlattenErrProj::First { f } => match ready!(f.try_poll(cx)) { + Err(f) => self.set(Self::Second { f }), + Ok(e) => { + self.set(Self::Empty); + break Ok(e); } }, TryFlattenErrProj::Second { f } => { let output = ready!(f.try_poll(cx)); self.set(Self::Empty); break output; - }, + } TryFlattenErrProj::Empty => panic!("TryFlattenErr polled after completion"), } }) diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs index 371f753..29244af 100644 --- a/src/future/try_join_all.rs +++ b/src/future/try_join_all.rs @@ -1,14 +1,14 @@ //! Definition of the `TryJoinAll` combinator, waiting for all of a list of //! futures to finish with either success or error. +use alloc::boxed::Box; +use alloc::vec::Vec; use core::fmt; use core::future::Future; use core::iter::FromIterator; use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use alloc::boxed::Box; -use alloc::vec::Vec; use super::{assert_future, TryFuture, TryMaybeDone}; @@ -16,15 +16,13 @@ fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. - unsafe { slice.get_unchecked_mut() } - .iter_mut() - .map(|t| unsafe { Pin::new_unchecked(t) }) + unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) } enum FinalState<E = ()> { Pending, AllDone, - Error(E) + Error(E), } /// Future for the [`try_join_all`] function. @@ -43,9 +41,7 @@ where F::Error: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryJoinAll") - .field("elems", &self.elems) - .finish() + f.debug_struct("TryJoinAll").field("elems", &self.elems).finish() } } @@ -93,9 +89,9 @@ where I::Item: TryFuture, { let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect(); - assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(TryJoinAll { - elems: elems.into(), - }) + assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( + TryJoinAll { elems: elems.into() }, + ) } impl<F> Future for TryJoinAll<F> @@ -110,7 +106,7 @@ where for elem in iter_pin_mut(self.elems.as_mut()) { match elem.try_poll(cx) { Poll::Pending => state = FinalState::Pending, - Poll::Ready(Ok(())) => {}, + Poll::Ready(Ok(())) => {} Poll::Ready(Err(e)) => { state = FinalState::Error(e); break; @@ -122,15 +118,14 @@ where FinalState::Pending => Poll::Pending, FinalState::AllDone => { let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let results = iter_pin_mut(elems.as_mut()) - .map(|e| e.take_output().unwrap()) - .collect(); + let results = + iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); Poll::Ready(Ok(results)) - }, + } FinalState::Error(e) => { let _ = mem::replace(&mut self.elems, Box::pin([])); Poll::Ready(Err(e)) - }, + } } } } diff --git a/src/future/try_maybe_done.rs b/src/future/try_maybe_done.rs index dfd2900..24044d2 100644 --- a/src/future/try_maybe_done.rs +++ b/src/future/try_maybe_done.rs @@ -49,13 +49,13 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> { #[inline] pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Ok> { match &*self { - Self::Done(_) => {}, + Self::Done(_) => {} Self::Future(_) | Self::Gone => return None, } unsafe { match mem::replace(self.get_unchecked_mut(), Self::Gone) { TryMaybeDone::Done(output) => Some(output), - _ => unreachable!() + _ => unreachable!(), } } } @@ -76,16 +76,14 @@ impl<Fut: TryFuture> Future for TryMaybeDone<Fut> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { unsafe { match self.as_mut().get_unchecked_mut() { - TryMaybeDone::Future(f) => { - match ready!(Pin::new_unchecked(f).try_poll(cx)) { - Ok(res) => self.set(Self::Done(res)), - Err(e) => { - self.set(Self::Gone); - return Poll::Ready(Err(e)); - } + TryMaybeDone::Future(f) => match ready!(Pin::new_unchecked(f).try_poll(cx)) { + Ok(res) => self.set(Self::Done(res)), + Err(e) => { + self.set(Self::Gone); + return Poll::Ready(Err(e)); } }, - TryMaybeDone::Done(_) => {}, + TryMaybeDone::Done(_) => {} TryMaybeDone::Gone => panic!("TryMaybeDone polled after value taken"), } } diff --git a/src/future/try_select.rs b/src/future/try_select.rs index b26eed3..4d0b7ff 100644 --- a/src/future/try_select.rs +++ b/src/future/try_select.rs @@ -1,7 +1,7 @@ +use crate::future::{Either, TryFutureExt}; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; -use crate::future::{Either, TryFutureExt}; /// Future for the [`try_select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] @@ -48,22 +48,23 @@ impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {} /// } /// ``` pub fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B> - where A: TryFuture + Unpin, B: TryFuture + Unpin +where + A: TryFuture + Unpin, + B: TryFuture + Unpin, { - super::assert_future::<Result< - Either<(A::Ok, B), (B::Ok, A)>, - Either<(A::Error, B), (B::Error, A)>, - >, _>(TrySelect { inner: Some((future1, future2)) }) + super::assert_future::< + Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>, + _, + >(TrySelect { inner: Some((future1, future2)) }) } impl<A: Unpin, B: Unpin> Future for TrySelect<A, B> - where A: TryFuture, B: TryFuture +where + A: TryFuture, + B: TryFuture, { #[allow(clippy::type_complexity)] - type Output = Result< - Either<(A::Ok, B), (B::Ok, A)>, - Either<(A::Error, B), (B::Error, A)>, - >; + type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); @@ -77,7 +78,7 @@ impl<A: Unpin, B: Unpin> Future for TrySelect<A, B> self.inner = Some((a, b)); Poll::Pending } - } + }, } } } |