aboutsummaryrefslogtreecommitdiff
path: root/src/future
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-10 07:03:29 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-10 07:03:29 +0000
commitec2853e697ebfe66828610e926981841e4879803 (patch)
tree7b826874213015b98b9e0383a89c88be66b64b11 /src/future
parent5a2ead4f79e3b4c604edd450714bd1d925de1d4e (diff)
parente177aca7d9e335e8989e592627752459e00df418 (diff)
downloadfutures-util-b94cc39404d16f92b2d950731f1f4c7866b5f4e7.tar.gz
Change-Id: I94036737fcc6a3b9c10172541508602e2eda7fb2
Diffstat (limited to 'src/future')
-rw-r--r--src/future/abortable.rs170
-rw-r--r--src/future/either.rs31
-rw-r--r--src/future/future/catch_unwind.rs10
-rw-r--r--src/future/future/flatten.rs47
-rw-r--r--src/future/future/fuse.rs4
-rw-r--r--src/future/future/remote_handle.rs14
-rw-r--r--src/future/future/shared.rs35
-rw-r--r--src/future/join.rs11
-rw-r--r--src/future/join_all.rs109
-rw-r--r--src/future/lazy.rs15
-rw-r--r--src/future/mod.rs24
-rw-r--r--src/future/option.rs13
-rw-r--r--src/future/pending.rs7
-rw-r--r--src/future/poll_fn.rs5
-rw-r--r--src/future/poll_immediate.rs126
-rw-r--r--src/future/select.rs18
-rw-r--r--src/future/select_all.rs26
-rw-r--r--src/future/select_ok.rs29
-rw-r--r--src/future/try_future/into_future.rs9
-rw-r--r--src/future/try_future/try_flatten.rs77
-rw-r--r--src/future/try_future/try_flatten_err.rs24
-rw-r--r--src/future/try_join_all.rs31
-rw-r--r--src/future/try_maybe_done.rs18
-rw-r--r--src/future/try_select.rs25
24 files changed, 441 insertions, 437 deletions
diff --git a/src/future/abortable.rs b/src/future/abortable.rs
index 3f2e5a0..d017ab7 100644
--- a/src/future/abortable.rs
+++ b/src/future/abortable.rs
@@ -1,110 +1,8 @@
use super::assert_future;
-use crate::task::AtomicWaker;
+use crate::future::{AbortHandle, Abortable, Aborted};
use futures_core::future::Future;
-use futures_core::task::{Context, Poll};
-use core::fmt;
-use core::pin::Pin;
-use core::sync::atomic::{AtomicBool, Ordering};
-use alloc::sync::Arc;
-use pin_project_lite::pin_project;
-pin_project! {
- /// A future which can be remotely short-circuited using an `AbortHandle`.
- #[derive(Debug, Clone)]
- #[must_use = "futures do nothing unless you `.await` or poll them"]
- pub struct Abortable<Fut> {
- #[pin]
- future: Fut,
- inner: Arc<AbortInner>,
- }
-}
-
-impl<Fut> Abortable<Fut> where Fut: Future {
- /// Creates a new `Abortable` future using an existing `AbortRegistration`.
- /// `AbortRegistration`s can be acquired through `AbortHandle::new`.
- ///
- /// When `abort` is called on the handle tied to `reg` or if `abort` has
- /// already been called, the future will complete immediately without making
- /// any further progress.
- ///
- /// Example:
- ///
- /// ```
- /// # futures::executor::block_on(async {
- /// use futures::future::{Abortable, AbortHandle, Aborted};
- ///
- /// let (abort_handle, abort_registration) = AbortHandle::new_pair();
- /// let future = Abortable::new(async { 2 }, abort_registration);
- /// abort_handle.abort();
- /// assert_eq!(future.await, Err(Aborted));
- /// # });
- /// ```
- pub fn new(future: Fut, reg: AbortRegistration) -> Self {
- assert_future::<Result<Fut::Output, Aborted>, _>(Self {
- future,
- inner: reg.inner,
- })
- }
-}
-
-/// A registration handle for a `Abortable` future.
-/// Values of this type can be acquired from `AbortHandle::new` and are used
-/// in calls to `Abortable::new`.
-#[derive(Debug)]
-pub struct AbortRegistration {
- inner: Arc<AbortInner>,
-}
-
-/// A handle to a `Abortable` future.
-#[derive(Debug, Clone)]
-pub struct AbortHandle {
- inner: Arc<AbortInner>,
-}
-
-impl AbortHandle {
- /// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used
- /// to abort a running future.
- ///
- /// This function is usually paired with a call to `Abortable::new`.
- ///
- /// Example:
- ///
- /// ```
- /// # futures::executor::block_on(async {
- /// use futures::future::{Abortable, AbortHandle, Aborted};
- ///
- /// let (abort_handle, abort_registration) = AbortHandle::new_pair();
- /// let future = Abortable::new(async { 2 }, abort_registration);
- /// abort_handle.abort();
- /// assert_eq!(future.await, Err(Aborted));
- /// # });
- /// ```
- pub fn new_pair() -> (Self, AbortRegistration) {
- let inner = Arc::new(AbortInner {
- waker: AtomicWaker::new(),
- cancel: AtomicBool::new(false),
- });
-
- (
- Self {
- inner: inner.clone(),
- },
- AbortRegistration {
- inner,
- },
- )
- }
-}
-
-// Inner type storing the waker to awaken and a bool indicating that it
-// should be cancelled.
-#[derive(Debug)]
-struct AbortInner {
- waker: AtomicWaker,
- cancel: AtomicBool,
-}
-
-/// Creates a new `Abortable` future and a `AbortHandle` which can be used to stop it.
+/// Creates a new `Abortable` future and an `AbortHandle` which can be used to stop it.
///
/// This function is a convenient (but less flexible) alternative to calling
/// `AbortHandle::new` and `Abortable::new` manually.
@@ -112,66 +10,10 @@ struct AbortInner {
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
pub fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle)
- where Fut: Future
+where
+ Fut: Future,
{
let (handle, reg) = AbortHandle::new_pair();
- (
- Abortable::new(future, reg),
- handle,
- )
-}
-
-/// Indicator that the `Abortable` future was aborted.
-#[derive(Copy, Clone, Debug, Eq, PartialEq)]
-pub struct Aborted;
-
-impl fmt::Display for Aborted {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "`Abortable` future has been aborted")
- }
-}
-
-#[cfg(feature = "std")]
-impl std::error::Error for Aborted {}
-
-impl<Fut> Future for Abortable<Fut> where Fut: Future {
- type Output = Result<Fut::Output, Aborted>;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- // Check if the future has been aborted
- if self.inner.cancel.load(Ordering::Relaxed) {
- return Poll::Ready(Err(Aborted))
- }
-
- // attempt to complete the future
- if let Poll::Ready(x) = self.as_mut().project().future.poll(cx) {
- return Poll::Ready(Ok(x))
- }
-
- // Register to receive a wakeup if the future is aborted in the... future
- self.inner.waker.register(cx.waker());
-
- // Check to see if the future was aborted between the first check and
- // registration.
- // Checking with `Relaxed` is sufficient because `register` introduces an
- // `AcqRel` barrier.
- if self.inner.cancel.load(Ordering::Relaxed) {
- return Poll::Ready(Err(Aborted))
- }
-
- Poll::Pending
- }
-}
-
-impl AbortHandle {
- /// Abort the `Abortable` future associated with this handle.
- ///
- /// Notifies the Abortable future associated with this handle that it
- /// should abort. Note that if the future is currently being polled on
- /// another thread, it will not immediately stop running. Instead, it will
- /// continue to run until its poll method returns.
- pub fn abort(&self) {
- self.inner.cancel.store(true, Ordering::Relaxed);
- self.inner.waker.wake();
- }
+ let abortable = assert_future::<Result<Fut::Output, Aborted>, _>(Abortable::new(future, reg));
+ (abortable, handle)
}
diff --git a/src/future/either.rs b/src/future/either.rs
index 5f5b614..9602de7 100644
--- a/src/future/either.rs
+++ b/src/future/either.rs
@@ -5,8 +5,25 @@ use futures_core::stream::{FusedStream, Stream};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-/// Combines two different futures, streams, or sinks having the same associated types into a single
-/// type.
+/// Combines two different futures, streams, or sinks having the same associated types into a single type.
+///
+/// This is useful when conditionally choosing between two distinct future types:
+///
+/// ```rust
+/// use futures::future::Either;
+///
+/// # futures::executor::block_on(async {
+/// let cond = true;
+///
+/// let fut = if cond {
+/// Either::Left(async move { 12 })
+/// } else {
+/// Either::Right(async move { 44 })
+/// };
+///
+/// assert_eq!(fut.await, 12);
+/// # })
+/// ```
#[derive(Debug, Clone)]
pub enum Either<A, B> {
/// First branch of the type
@@ -167,8 +184,6 @@ mod if_std {
use core::pin::Pin;
use core::task::{Context, Poll};
- #[cfg(feature = "read-initializer")]
- use futures_io::Initializer;
use futures_io::{
AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom,
};
@@ -178,14 +193,6 @@ mod if_std {
A: AsyncRead,
B: AsyncRead,
{
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- match self {
- Either::Left(x) => x.initializer(),
- Either::Right(x) => x.initializer(),
- }
- }
-
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
diff --git a/src/future/future/catch_unwind.rs b/src/future/future/catch_unwind.rs
index 3f16577..0e09d6e 100644
--- a/src/future/future/catch_unwind.rs
+++ b/src/future/future/catch_unwind.rs
@@ -1,6 +1,6 @@
use core::any::Any;
use core::pin::Pin;
-use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
+use std::panic::{catch_unwind, AssertUnwindSafe, UnwindSafe};
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
@@ -16,14 +16,18 @@ pin_project! {
}
}
-impl<Fut> CatchUnwind<Fut> where Fut: Future + UnwindSafe {
+impl<Fut> CatchUnwind<Fut>
+where
+ Fut: Future + UnwindSafe,
+{
pub(super) fn new(future: Fut) -> Self {
Self { future }
}
}
impl<Fut> Future for CatchUnwind<Fut>
- where Fut: Future + UnwindSafe,
+where
+ Fut: Future + UnwindSafe,
{
type Output = Result<Fut::Output, Box<dyn Any + Send>>;
diff --git a/src/future/future/flatten.rs b/src/future/future/flatten.rs
index 0c48a4f..bd767af 100644
--- a/src/future/future/flatten.rs
+++ b/src/future/future/flatten.rs
@@ -2,9 +2,9 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;
pin_project! {
@@ -24,8 +24,9 @@ impl<Fut1, Fut2> Flatten<Fut1, Fut2> {
}
impl<Fut> FusedFuture for Flatten<Fut, Fut::Output>
- where Fut: Future,
- Fut::Output: Future,
+where
+ Fut: Future,
+ Fut::Output: Future,
{
fn is_terminated(&self) -> bool {
match self {
@@ -36,8 +37,9 @@ impl<Fut> FusedFuture for Flatten<Fut, Fut::Output>
}
impl<Fut> Future for Flatten<Fut, Fut::Output>
- where Fut: Future,
- Fut::Output: Future,
+where
+ Fut: Future,
+ Fut::Output: Future,
{
type Output = <Fut::Output as Future>::Output;
@@ -47,12 +49,12 @@ impl<Fut> Future for Flatten<Fut, Fut::Output>
FlattenProj::First { f } => {
let f = ready!(f.poll(cx));
self.set(Self::Second { f });
- },
+ }
FlattenProj::Second { f } => {
let output = ready!(f.poll(cx));
self.set(Self::Empty);
break output;
- },
+ }
FlattenProj::Empty => panic!("Flatten polled after completion"),
}
})
@@ -60,8 +62,9 @@ impl<Fut> Future for Flatten<Fut, Fut::Output>
}
impl<Fut> FusedStream for Flatten<Fut, Fut::Output>
- where Fut: Future,
- Fut::Output: Stream,
+where
+ Fut: Future,
+ Fut::Output: Stream,
{
fn is_terminated(&self) -> bool {
match self {
@@ -72,32 +75,32 @@ impl<Fut> FusedStream for Flatten<Fut, Fut::Output>
}
impl<Fut> Stream for Flatten<Fut, Fut::Output>
- where Fut: Future,
- Fut::Output: Stream,
+where
+ Fut: Future,
+ Fut::Output: Stream,
{
type Item = <Fut::Output as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(loop {
match self.as_mut().project() {
- FlattenProj::First { f } => {
+ FlattenProj::First { f } => {
let f = ready!(f.poll(cx));
self.set(Self::Second { f });
- },
+ }
FlattenProj::Second { f } => {
let output = ready!(f.poll_next(cx));
if output.is_none() {
self.set(Self::Empty);
}
break output;
- },
+ }
FlattenProj::Empty => break None,
}
})
}
}
-
#[cfg(feature = "sink")]
impl<Fut, Item> Sink<Item> for Flatten<Fut, Fut::Output>
where
@@ -106,19 +109,16 @@ where
{
type Error = <Fut::Output as Sink<Item>>::Error;
- fn poll_ready(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(loop {
match self.as_mut().project() {
FlattenProj::First { f } => {
let f = ready!(f.poll(cx));
self.set(Self::Second { f });
- },
+ }
FlattenProj::Second { f } => {
break ready!(f.poll_ready(cx));
- },
+ }
FlattenProj::Empty => panic!("poll_ready called after eof"),
}
})
@@ -140,10 +140,7 @@ where
}
}
- fn poll_close(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let res = match self.as_mut().project() {
FlattenProj::Second { f } => f.poll_close(cx),
_ => Poll::Ready(Ok(())),
diff --git a/src/future/future/fuse.rs b/src/future/future/fuse.rs
index f4284ba..597aec1 100644
--- a/src/future/future/fuse.rs
+++ b/src/future/future/fuse.rs
@@ -1,5 +1,5 @@
use core::pin::Pin;
-use futures_core::future::{Future, FusedFuture};
+use futures_core::future::{FusedFuture, Future};
use futures_core::ready;
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;
@@ -86,7 +86,7 @@ impl<Fut: Future> Future for Fuse<Fut> {
let output = ready!(fut.poll(cx));
self.project().inner.set(None);
output
- },
+ }
None => return Poll::Pending,
})
}
diff --git a/src/future/future/remote_handle.rs b/src/future/future/remote_handle.rs
index 0d33ea5..1358902 100644
--- a/src/future/future/remote_handle.rs
+++ b/src/future/future/remote_handle.rs
@@ -1,23 +1,23 @@
use {
crate::future::{CatchUnwind, FutureExt},
- futures_channel::oneshot::{self, Sender, Receiver},
+ futures_channel::oneshot::{self, Receiver, Sender},
futures_core::{
future::Future,
- task::{Context, Poll},
ready,
+ task::{Context, Poll},
},
+ pin_project_lite::pin_project,
std::{
any::Any,
fmt,
panic::{self, AssertUnwindSafe},
pin::Pin,
sync::{
- Arc,
atomic::{AtomicBool, Ordering},
+ Arc,
},
thread,
},
- pin_project_lite::pin_project,
};
/// The handle to a remote future returned by
@@ -36,7 +36,7 @@ use {
/// must be careful with regard to unwind safety because the thread in which the future
/// is polled will keep running after the panic and the thread running the [RemoteHandle]
/// will unwind.
-#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[must_use = "dropping a remote handle cancels the underlying future"]
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
pub struct RemoteHandle<T> {
@@ -85,9 +85,7 @@ pin_project! {
impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_tuple("Remote")
- .field(&self.future)
- .finish()
+ f.debug_tuple("Remote").field(&self.future).finish()
}
}
diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs
index 74311a0..9b31932 100644
--- a/src/future/future/shared.rs
+++ b/src/future/future/shared.rs
@@ -29,6 +29,12 @@ struct Notifier {
/// A weak reference to a [`Shared`] that can be upgraded much like an `Arc`.
pub struct WeakShared<Fut: Future>(Weak<Inner<Fut>>);
+impl<Fut: Future> Clone for WeakShared<Fut> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+
// The future itself is polled behind the `Arc`, so it won't be moved
// when `Shared` is moved.
impl<Fut: Future> Unpin for Shared<Fut> {}
@@ -90,10 +96,7 @@ impl<Fut: Future> Shared<Fut> {
}),
};
- Self {
- inner: Some(Arc::new(inner)),
- waker_key: NULL_WAKER_KEY,
- }
+ Self { inner: Some(Arc::new(inner)), waker_key: NULL_WAKER_KEY }
}
}
@@ -223,10 +226,7 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
- let inner = this
- .inner
- .take()
- .expect("Shared future polled again after completion");
+ let inner = this.inner.take().expect("Shared future polled again after completion");
// Fast path for when the wrapped future has already completed
if inner.notifier.state.load(Acquire) == COMPLETE {
@@ -286,11 +286,7 @@ where
match future.poll(&mut cx) {
Poll::Pending => {
- if inner
- .notifier
- .state
- .compare_exchange(POLLING, IDLE, SeqCst, SeqCst)
- .is_ok()
+ if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok()
{
// Success
drop(_reset);
@@ -330,10 +326,7 @@ where
Fut: Future,
{
fn clone(&self) -> Self {
- Self {
- inner: self.inner.clone(),
- waker_key: NULL_WAKER_KEY,
- }
+ Self { inner: self.inner.clone(), waker_key: NULL_WAKER_KEY }
}
}
@@ -367,16 +360,12 @@ impl ArcWake for Notifier {
}
}
-impl<Fut: Future> WeakShared<Fut>
-{
+impl<Fut: Future> WeakShared<Fut> {
/// Attempts to upgrade this [`WeakShared`] into a [`Shared`].
///
/// Returns [`None`] if all clones of the [`Shared`] have been dropped or polled
/// to completion.
pub fn upgrade(&self) -> Option<Shared<Fut>> {
- Some(Shared {
- inner: Some(self.0.upgrade()?),
- waker_key: NULL_WAKER_KEY,
- })
+ Some(Shared { inner: Some(self.0.upgrade()?), waker_key: NULL_WAKER_KEY })
}
}
diff --git a/src/future/join.rs b/src/future/join.rs
index a818343..740ffbc 100644
--- a/src/future/join.rs
+++ b/src/future/join.rs
@@ -213,14 +213,5 @@ where
Fut5: Future,
{
let f = Join5::new(future1, future2, future3, future4, future5);
- assert_future::<
- (
- Fut1::Output,
- Fut2::Output,
- Fut3::Output,
- Fut4::Output,
- Fut5::Output,
- ),
- _,
- >(f)
+ assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output, Fut4::Output, Fut5::Output), _>(f)
}
diff --git a/src/future/join_all.rs b/src/future/join_all.rs
index 7ccf869..2e52ac1 100644
--- a/src/future/join_all.rs
+++ b/src/future/join_all.rs
@@ -1,33 +1,50 @@
//! Definition of the `JoinAll` combinator, waiting for all of a list of futures
//! to finish.
+use alloc::boxed::Box;
+use alloc::vec::Vec;
use core::fmt;
use core::future::Future;
use core::iter::FromIterator;
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};
-use alloc::boxed::Box;
-use alloc::vec::Vec;
-use super::{MaybeDone, assert_future};
+use super::{assert_future, MaybeDone};
+
+#[cfg(not(futures_no_atomic_cas))]
+use crate::stream::{Collect, FuturesOrdered, StreamExt};
fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
// the same safety as a normal field pin projection.
- unsafe { slice.get_unchecked_mut() }
- .iter_mut()
- .map(|t| unsafe { Pin::new_unchecked(t) })
+ unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
}
-/// Future for the [`join_all`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
+/// Future for the [`join_all`] function.
pub struct JoinAll<F>
where
F: Future,
{
- elems: Pin<Box<[MaybeDone<F>]>>,
+ kind: JoinAllKind<F>,
+}
+
+#[cfg(not(futures_no_atomic_cas))]
+const SMALL: usize = 30;
+
+pub(crate) enum JoinAllKind<F>
+where
+ F: Future,
+{
+ Small {
+ elems: Pin<Box<[MaybeDone<F>]>>,
+ },
+ #[cfg(not(futures_no_atomic_cas))]
+ Big {
+ fut: Collect<FuturesOrdered<F>, Vec<F::Output>>,
+ },
}
impl<F> fmt::Debug for JoinAll<F>
@@ -36,9 +53,13 @@ where
F::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("JoinAll")
- .field("elems", &self.elems)
- .finish()
+ match self.kind {
+ JoinAllKind::Small { ref elems } => {
+ f.debug_struct("JoinAll").field("elems", elems).finish()
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
+ }
}
}
@@ -54,10 +75,9 @@ where
///
/// # See Also
///
-/// This is purposefully a very simple API for basic use-cases. In a lot of
-/// cases you will want to use the more powerful
-/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does
-/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered].
+/// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance
+/// reasons if the number of futures is large. You may want to look into using it or
+/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
///
/// Some examples for additional functionality provided by these are:
///
@@ -79,13 +99,33 @@ where
/// assert_eq!(join_all(futures).await, [1, 2, 3]);
/// # });
/// ```
-pub fn join_all<I>(i: I) -> JoinAll<I::Item>
+pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
where
I: IntoIterator,
I::Item: Future,
{
- let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect();
- assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { elems: elems.into() })
+ #[cfg(futures_no_atomic_cas)]
+ {
+ let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into();
+ let kind = JoinAllKind::Small { elems };
+ assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ {
+ let iter = iter.into_iter();
+ let kind = match iter.size_hint().1 {
+ None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
+ Some(max) => {
+ if max <= SMALL {
+ let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into();
+ JoinAllKind::Small { elems }
+ } else {
+ JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }
+ }
+ }
+ };
+ assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
+ }
}
impl<F> Future for JoinAll<F>
@@ -95,22 +135,27 @@ where
type Output = Vec<F::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let mut all_done = true;
+ match &mut self.kind {
+ JoinAllKind::Small { elems } => {
+ let mut all_done = true;
- for elem in iter_pin_mut(self.elems.as_mut()) {
- if elem.poll(cx).is_pending() {
- all_done = false;
- }
- }
+ for elem in iter_pin_mut(elems.as_mut()) {
+ if elem.poll(cx).is_pending() {
+ all_done = false;
+ }
+ }
- if all_done {
- let mut elems = mem::replace(&mut self.elems, Box::pin([]));
- let result = iter_pin_mut(elems.as_mut())
- .map(|e| e.take_output().unwrap())
- .collect();
- Poll::Ready(result)
- } else {
- Poll::Pending
+ if all_done {
+ let mut elems = mem::replace(elems, Box::pin([]));
+ let result =
+ iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
+ Poll::Ready(result)
+ } else {
+ Poll::Pending
+ }
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ JoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
}
}
}
diff --git a/src/future/lazy.rs b/src/future/lazy.rs
index 42812d3..e9a8cf2 100644
--- a/src/future/lazy.rs
+++ b/src/future/lazy.rs
@@ -7,7 +7,7 @@ use futures_core::task::{Context, Poll};
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Lazy<F> {
- f: Option<F>
+ f: Option<F>,
}
// safe because we never generate `Pin<&mut F>`
@@ -33,19 +33,24 @@ impl<F> Unpin for Lazy<F> {}
/// # });
/// ```
pub fn lazy<F, R>(f: F) -> Lazy<F>
- where F: FnOnce(&mut Context<'_>) -> R,
+where
+ F: FnOnce(&mut Context<'_>) -> R,
{
assert_future::<R, _>(Lazy { f: Some(f) })
}
impl<F, R> FusedFuture for Lazy<F>
- where F: FnOnce(&mut Context<'_>) -> R,
+where
+ F: FnOnce(&mut Context<'_>) -> R,
{
- fn is_terminated(&self) -> bool { self.f.is_none() }
+ fn is_terminated(&self) -> bool {
+ self.f.is_none()
+ }
}
impl<F, R> Future for Lazy<F>
- where F: FnOnce(&mut Context<'_>) -> R,
+where
+ F: FnOnce(&mut Context<'_>) -> R,
{
type Output = R;
diff --git a/src/future/mod.rs b/src/future/mod.rs
index 84e457c..374e365 100644
--- a/src/future/mod.rs
+++ b/src/future/mod.rs
@@ -21,7 +21,7 @@ pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj};
#[allow(clippy::module_inception)]
mod future;
pub use self::future::{
- Flatten, Fuse, FutureExt, Inspect, IntoStream, Map, NeverError, Then, UnitError, MapInto,
+ Flatten, Fuse, FutureExt, Inspect, IntoStream, Map, MapInto, NeverError, Then, UnitError,
};
#[deprecated(note = "This is now an alias for [Flatten](Flatten)")]
@@ -40,8 +40,8 @@ pub use self::future::{Shared, WeakShared};
mod try_future;
pub use self::try_future::{
- AndThen, ErrInto, OkInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, OrElse, TryFlattenStream,
- TryFutureExt, UnwrapOrElse, MapOkOrElse, TryFlatten,
+ AndThen, ErrInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, MapOkOrElse, OkInto,
+ OrElse, TryFlatten, TryFlattenStream, TryFutureExt, UnwrapOrElse,
};
#[cfg(feature = "sink")]
@@ -68,6 +68,9 @@ pub use self::option::OptionFuture;
mod poll_fn;
pub use self::poll_fn::{poll_fn, PollFn};
+mod poll_immediate;
+pub use self::poll_immediate::{poll_immediate, PollImmediate};
+
mod ready;
pub use self::ready::{err, ok, ready, Ready};
@@ -108,12 +111,15 @@ pub use self::select_ok::{select_ok, SelectOk};
mod either;
pub use self::either::Either;
-cfg_target_has_atomic! {
- #[cfg(feature = "alloc")]
- mod abortable;
- #[cfg(feature = "alloc")]
- pub use self::abortable::{abortable, Abortable, AbortHandle, AbortRegistration, Aborted};
-}
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod abortable;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use crate::abortable::{AbortHandle, AbortRegistration, Abortable, Aborted};
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+pub use abortable::abortable;
// Just a helper function to ensure the futures we're returning all have the
// right implementations.
diff --git a/src/future/option.rs b/src/future/option.rs
index 85939d6..0bc3777 100644
--- a/src/future/option.rs
+++ b/src/future/option.rs
@@ -1,7 +1,7 @@
//! Definition of the `Option` (optional step) combinator
use core::pin::Pin;
-use futures_core::future::{Future, FusedFuture};
+use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;
@@ -31,13 +31,16 @@ pin_project! {
}
}
+impl<F> Default for OptionFuture<F> {
+ fn default() -> Self {
+ Self { inner: None }
+ }
+}
+
impl<F: Future> Future for OptionFuture<F> {
type Output = Option<F::Output>;
- fn poll(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Self::Output> {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().inner.as_pin_mut() {
Some(x) => x.poll(cx).map(Some),
None => Poll::Ready(None),
diff --git a/src/future/pending.rs b/src/future/pending.rs
index 4311b9a..92c78d5 100644
--- a/src/future/pending.rs
+++ b/src/future/pending.rs
@@ -34,9 +34,7 @@ impl<T> FusedFuture for Pending<T> {
/// # });
/// ```
pub fn pending<T>() -> Pending<T> {
- assert_future::<T, _>(Pending {
- _data: marker::PhantomData,
- })
+ assert_future::<T, _>(Pending { _data: marker::PhantomData })
}
impl<T> Future for Pending<T> {
@@ -47,8 +45,7 @@ impl<T> Future for Pending<T> {
}
}
-impl<T> Unpin for Pending<T> {
-}
+impl<T> Unpin for Pending<T> {}
impl<T> Clone for Pending<T> {
fn clone(&self) -> Self {
diff --git a/src/future/poll_fn.rs b/src/future/poll_fn.rs
index 6ac1ab8..1931157 100644
--- a/src/future/poll_fn.rs
+++ b/src/future/poll_fn.rs
@@ -35,7 +35,7 @@ impl<F> Unpin for PollFn<F> {}
/// ```
pub fn poll_fn<T, F>(f: F) -> PollFn<F>
where
- F: FnMut(&mut Context<'_>) -> Poll<T>
+ F: FnMut(&mut Context<'_>) -> Poll<T>,
{
assert_future::<T, _>(PollFn { f })
}
@@ -47,7 +47,8 @@ impl<F> fmt::Debug for PollFn<F> {
}
impl<T, F> Future for PollFn<F>
- where F: FnMut(&mut Context<'_>) -> Poll<T>,
+where
+ F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;
diff --git a/src/future/poll_immediate.rs b/src/future/poll_immediate.rs
new file mode 100644
index 0000000..5ae555c
--- /dev/null
+++ b/src/future/poll_immediate.rs
@@ -0,0 +1,126 @@
+use super::assert_future;
+use core::pin::Pin;
+use futures_core::task::{Context, Poll};
+use futures_core::{FusedFuture, Future, Stream};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`poll_immediate`](poll_immediate()) function.
+ ///
+ /// It will never return [Poll::Pending](core::task::Poll::Pending)
+ #[derive(Debug, Clone)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct PollImmediate<T> {
+ #[pin]
+ future: Option<T>
+ }
+}
+
+impl<T, F> Future for PollImmediate<F>
+where
+ F: Future<Output = T>,
+{
+ type Output = Option<T>;
+
+ #[inline]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ let mut this = self.project();
+ let inner =
+ this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion");
+ match inner.poll(cx) {
+ Poll::Ready(t) => {
+ this.future.set(None);
+ Poll::Ready(Some(t))
+ }
+ Poll::Pending => Poll::Ready(None),
+ }
+ }
+}
+
+impl<T: Future> FusedFuture for PollImmediate<T> {
+ fn is_terminated(&self) -> bool {
+ self.future.is_none()
+ }
+}
+
+/// A [Stream](crate::stream::Stream) implementation that can be polled repeatedly until the future is done.
+/// The stream will never return [Poll::Pending](core::task::Poll::Pending)
+/// so polling it in a tight loop is worse than using a blocking synchronous function.
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::task::Poll;
+/// use futures::{StreamExt, future, pin_mut};
+/// use future::FusedFuture;
+///
+/// let f = async { 1_u32 };
+/// pin_mut!(f);
+/// let mut r = future::poll_immediate(f);
+/// assert_eq!(r.next().await, Some(Poll::Ready(1)));
+///
+/// let f = async {futures::pending!(); 42_u8};
+/// pin_mut!(f);
+/// let mut p = future::poll_immediate(f);
+/// assert_eq!(p.next().await, Some(Poll::Pending));
+/// assert!(!p.is_terminated());
+/// assert_eq!(p.next().await, Some(Poll::Ready(42)));
+/// assert!(p.is_terminated());
+/// assert_eq!(p.next().await, None);
+/// # });
+/// ```
+impl<T, F> Stream for PollImmediate<F>
+where
+ F: Future<Output = T>,
+{
+ type Item = Poll<T>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+ match this.future.as_mut().as_pin_mut() {
+ // inner is gone, so we can signal that the stream is closed.
+ None => Poll::Ready(None),
+ Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| {
+ this.future.set(None);
+ t
+ }))),
+ }
+ }
+}
+
+/// Creates a future that is immediately ready with an Option of a value.
+/// Specifically this means that [poll](core::future::Future::poll()) always returns [Poll::Ready](core::task::Poll::Ready).
+///
+/// # Caution
+///
+/// When consuming the future by this function, note the following:
+///
+/// - This function does not guarantee that the future will run to completion, so it is generally incompatible with passing the non-cancellation-safe future by value.
+/// - Even if the future is cancellation-safe, creating and dropping new futures frequently may lead to performance problems.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let r = future::poll_immediate(async { 1_u32 });
+/// assert_eq!(r.await, Some(1));
+///
+/// let p = future::poll_immediate(future::pending::<i32>());
+/// assert_eq!(p.await, None);
+/// # });
+/// ```
+///
+/// ### Reusing a future
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::{future, pin_mut};
+/// let f = async {futures::pending!(); 42_u8};
+/// pin_mut!(f);
+/// assert_eq!(None, future::poll_immediate(&mut f).await);
+/// assert_eq!(42, f.await);
+/// # });
+/// ```
+pub fn poll_immediate<F: Future>(f: F) -> PollImmediate<F> {
+ assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate { future: Some(f) })
+}
diff --git a/src/future/select.rs b/src/future/select.rs
index 043ed17..bd44f20 100644
--- a/src/future/select.rs
+++ b/src/future/select.rs
@@ -1,8 +1,8 @@
use super::assert_future;
+use crate::future::{Either, FutureExt};
use core::pin::Pin;
-use futures_core::future::{Future, FusedFuture};
+use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll};
-use crate::future::{Either, FutureExt};
/// Future for the [`select()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
@@ -37,13 +37,13 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
/// future::Either,
/// future::self,
/// };
-///
+///
/// // These two futures have different types even though their outputs have the same type.
/// let future1 = async {
/// future::pending::<()>().await; // will never finish
/// 1
/// };
-/// let future2 = async {
+/// let future2 = async {
/// future::ready(2).await
/// };
///
@@ -82,9 +82,13 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
/// }
/// ```
pub fn select<A, B>(future1: A, future2: B) -> Select<A, B>
- where A: Future + Unpin, B: Future + Unpin
+where
+ A: Future + Unpin,
+ B: Future + Unpin,
{
- assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select { inner: Some((future1, future2)) })
+ assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select {
+ inner: Some((future1, future2)),
+ })
}
impl<A, B> Future for Select<A, B>
@@ -104,7 +108,7 @@ where
self.inner = Some((a, b));
Poll::Pending
}
- }
+ },
}
}
}
diff --git a/src/future/select_all.rs b/src/future/select_all.rs
index 0db90a7..106e508 100644
--- a/src/future/select_all.rs
+++ b/src/future/select_all.rs
@@ -1,9 +1,9 @@
use super::assert_future;
use crate::future::FutureExt;
+use alloc::vec::Vec;
use core::iter::FromIterator;
use core::mem;
use core::pin::Pin;
-use alloc::vec::Vec;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
@@ -32,25 +32,29 @@ impl<Fut: Unpin> Unpin for SelectAll<Fut> {}
///
/// This function will panic if the iterator specified contains no items.
pub fn select_all<I>(iter: I) -> SelectAll<I::Item>
- where I: IntoIterator,
- I::Item: Future + Unpin,
+where
+ I: IntoIterator,
+ I::Item: Future + Unpin,
{
- let ret = SelectAll {
- inner: iter.into_iter().collect()
- };
+ let ret = SelectAll { inner: iter.into_iter().collect() };
assert!(!ret.inner.is_empty());
assert_future::<(<I::Item as Future>::Output, usize, Vec<I::Item>), _>(ret)
}
+impl<Fut> SelectAll<Fut> {
+ /// Consumes this combinator, returning the underlying futures.
+ pub fn into_inner(self) -> Vec<Fut> {
+ self.inner
+ }
+}
+
impl<Fut: Future + Unpin> Future for SelectAll<Fut> {
type Output = (Fut::Output, usize, Vec<Fut>);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| {
- match f.poll_unpin(cx) {
- Poll::Pending => None,
- Poll::Ready(e) => Some((i, e)),
- }
+ let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.poll_unpin(cx) {
+ Poll::Pending => None,
+ Poll::Ready(e) => Some((i, e)),
});
match item {
Some((idx, res)) => {
diff --git a/src/future/select_ok.rs b/src/future/select_ok.rs
index 52d393c..0ad83c6 100644
--- a/src/future/select_ok.rs
+++ b/src/future/select_ok.rs
@@ -1,9 +1,9 @@
use super::assert_future;
use crate::future::TryFutureExt;
+use alloc::vec::Vec;
use core::iter::FromIterator;
use core::mem;
use core::pin::Pin;
-use alloc::vec::Vec;
use futures_core::future::{Future, TryFuture};
use futures_core::task::{Context, Poll};
@@ -30,14 +30,16 @@ impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
///
/// This function will panic if the iterator specified contains no items.
pub fn select_ok<I>(iter: I) -> SelectOk<I::Item>
- where I: IntoIterator,
- I::Item: TryFuture + Unpin,
+where
+ I: IntoIterator,
+ I::Item: TryFuture + Unpin,
{
- let ret = SelectOk {
- inner: iter.into_iter().collect()
- };
+ let ret = SelectOk { inner: iter.into_iter().collect() };
assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
- assert_future::<Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>, _>(ret)
+ assert_future::<
+ Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>,
+ _,
+ >(ret)
}
impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
@@ -46,12 +48,11 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// loop until we've either exhausted all errors, a success was hit, or nothing is ready
loop {
- let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| {
- match f.try_poll_unpin(cx) {
+ let item =
+ self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) {
Poll::Pending => None,
Poll::Ready(e) => Some((i, e)),
- }
- });
+ });
match item {
Some((idx, res)) => {
// always remove Ok or Err, if it's not the last Err continue looping
@@ -59,18 +60,18 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
match res {
Ok(e) => {
let rest = mem::replace(&mut self.inner, Vec::new());
- return Poll::Ready(Ok((e, rest)))
+ return Poll::Ready(Ok((e, rest)));
}
Err(e) => {
if self.inner.is_empty() {
- return Poll::Ready(Err(e))
+ return Poll::Ready(Err(e));
}
}
}
}
None => {
// based on the filter above, nothing is ready, return
- return Poll::Pending
+ return Poll::Pending;
}
}
}
diff --git a/src/future/try_future/into_future.rs b/src/future/try_future/into_future.rs
index e88d603..9f093d0 100644
--- a/src/future/try_future/into_future.rs
+++ b/src/future/try_future/into_future.rs
@@ -21,17 +21,16 @@ impl<Fut> IntoFuture<Fut> {
}
impl<Fut: TryFuture + FusedFuture> FusedFuture for IntoFuture<Fut> {
- fn is_terminated(&self) -> bool { self.future.is_terminated() }
+ fn is_terminated(&self) -> bool {
+ self.future.is_terminated()
+ }
}
impl<Fut: TryFuture> Future for IntoFuture<Fut> {
type Output = Result<Fut::Ok, Fut::Error>;
#[inline]
- fn poll(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Self::Output> {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().future.try_poll(cx)
}
}
diff --git a/src/future/try_future/try_flatten.rs b/src/future/try_future/try_flatten.rs
index 5241b27..1ce4559 100644
--- a/src/future/try_future/try_flatten.rs
+++ b/src/future/try_future/try_flatten.rs
@@ -2,9 +2,9 @@ use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
use futures_core::ready;
use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;
pin_project! {
@@ -24,8 +24,9 @@ impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> {
}
impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok>
- where Fut: TryFuture,
- Fut::Ok: TryFuture<Error=Fut::Error>,
+where
+ Fut: TryFuture,
+ Fut::Ok: TryFuture<Error = Fut::Error>,
{
fn is_terminated(&self) -> bool {
match self {
@@ -36,28 +37,27 @@ impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok>
}
impl<Fut> Future for TryFlatten<Fut, Fut::Ok>
- where Fut: TryFuture,
- Fut::Ok: TryFuture<Error=Fut::Error>,
+where
+ Fut: TryFuture,
+ Fut::Ok: TryFuture<Error = Fut::Error>,
{
type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(loop {
match self.as_mut().project() {
- TryFlattenProj::First { f } => {
- match ready!(f.try_poll(cx)) {
- Ok(f) => self.set(Self::Second { f }),
- Err(e) => {
- self.set(Self::Empty);
- break Err(e);
- }
+ TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
+ Ok(f) => self.set(Self::Second { f }),
+ Err(e) => {
+ self.set(Self::Empty);
+ break Err(e);
}
},
TryFlattenProj::Second { f } => {
let output = ready!(f.try_poll(cx));
self.set(Self::Empty);
break output;
- },
+ }
TryFlattenProj::Empty => panic!("TryFlatten polled after completion"),
}
})
@@ -65,8 +65,9 @@ impl<Fut> Future for TryFlatten<Fut, Fut::Ok>
}
impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok>
- where Fut: TryFuture,
- Fut::Ok: TryStream<Error=Fut::Error>,
+where
+ Fut: TryFuture,
+ Fut::Ok: TryStream<Error = Fut::Error>,
{
fn is_terminated(&self) -> bool {
match self {
@@ -77,21 +78,20 @@ impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok>
}
impl<Fut> Stream for TryFlatten<Fut, Fut::Ok>
- where Fut: TryFuture,
- Fut::Ok: TryStream<Error=Fut::Error>,
+where
+ Fut: TryFuture,
+ Fut::Ok: TryStream<Error = Fut::Error>,
{
type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(loop {
match self.as_mut().project() {
- TryFlattenProj::First { f } => {
- match ready!(f.try_poll(cx)) {
- Ok(f) => self.set(Self::Second { f }),
- Err(e) => {
- self.set(Self::Empty);
- break Some(Err(e));
- }
+ TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
+ Ok(f) => self.set(Self::Second { f }),
+ Err(e) => {
+ self.set(Self::Empty);
+ break Some(Err(e));
}
},
TryFlattenProj::Second { f } => {
@@ -100,40 +100,34 @@ impl<Fut> Stream for TryFlatten<Fut, Fut::Ok>
self.set(Self::Empty);
}
break output;
- },
+ }
TryFlattenProj::Empty => break None,
}
})
}
}
-
#[cfg(feature = "sink")]
impl<Fut, Item> Sink<Item> for TryFlatten<Fut, Fut::Ok>
where
Fut: TryFuture,
- Fut::Ok: Sink<Item, Error=Fut::Error>,
+ Fut::Ok: Sink<Item, Error = Fut::Error>,
{
type Error = Fut::Error;
- fn poll_ready(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(loop {
match self.as_mut().project() {
- TryFlattenProj::First { f } => {
- match ready!(f.try_poll(cx)) {
- Ok(f) => self.set(Self::Second { f }),
- Err(e) => {
- self.set(Self::Empty);
- break Err(e);
- }
+ TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) {
+ Ok(f) => self.set(Self::Second { f }),
+ Err(e) => {
+ self.set(Self::Empty);
+ break Err(e);
}
},
TryFlattenProj::Second { f } => {
break ready!(f.poll_ready(cx));
- },
+ }
TryFlattenProj::Empty => panic!("poll_ready called after eof"),
}
})
@@ -155,10 +149,7 @@ where
}
}
- fn poll_close(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let res = match self.as_mut().project() {
TryFlattenProj::Second { f } => f.poll_close(cx),
_ => Poll::Ready(Ok(())),
diff --git a/src/future/try_future/try_flatten_err.rs b/src/future/try_future/try_flatten_err.rs
index 2e67f11..39b7d9f 100644
--- a/src/future/try_future/try_flatten_err.rs
+++ b/src/future/try_future/try_flatten_err.rs
@@ -21,8 +21,9 @@ impl<Fut1, Fut2> TryFlattenErr<Fut1, Fut2> {
}
impl<Fut> FusedFuture for TryFlattenErr<Fut, Fut::Error>
- where Fut: TryFuture,
- Fut::Error: TryFuture<Ok=Fut::Ok>,
+where
+ Fut: TryFuture,
+ Fut::Error: TryFuture<Ok = Fut::Ok>,
{
fn is_terminated(&self) -> bool {
match self {
@@ -33,28 +34,27 @@ impl<Fut> FusedFuture for TryFlattenErr<Fut, Fut::Error>
}
impl<Fut> Future for TryFlattenErr<Fut, Fut::Error>
- where Fut: TryFuture,
- Fut::Error: TryFuture<Ok=Fut::Ok>,
+where
+ Fut: TryFuture,
+ Fut::Error: TryFuture<Ok = Fut::Ok>,
{
type Output = Result<Fut::Ok, <Fut::Error as TryFuture>::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(loop {
match self.as_mut().project() {
- TryFlattenErrProj::First { f } => {
- match ready!(f.try_poll(cx)) {
- Err(f) => self.set(Self::Second { f }),
- Ok(e) => {
- self.set(Self::Empty);
- break Ok(e);
- }
+ TryFlattenErrProj::First { f } => match ready!(f.try_poll(cx)) {
+ Err(f) => self.set(Self::Second { f }),
+ Ok(e) => {
+ self.set(Self::Empty);
+ break Ok(e);
}
},
TryFlattenErrProj::Second { f } => {
let output = ready!(f.try_poll(cx));
self.set(Self::Empty);
break output;
- },
+ }
TryFlattenErrProj::Empty => panic!("TryFlattenErr polled after completion"),
}
})
diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs
index 371f753..29244af 100644
--- a/src/future/try_join_all.rs
+++ b/src/future/try_join_all.rs
@@ -1,14 +1,14 @@
//! Definition of the `TryJoinAll` combinator, waiting for all of a list of
//! futures to finish with either success or error.
+use alloc::boxed::Box;
+use alloc::vec::Vec;
use core::fmt;
use core::future::Future;
use core::iter::FromIterator;
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};
-use alloc::boxed::Box;
-use alloc::vec::Vec;
use super::{assert_future, TryFuture, TryMaybeDone};
@@ -16,15 +16,13 @@ fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
// the same safety as a normal field pin projection.
- unsafe { slice.get_unchecked_mut() }
- .iter_mut()
- .map(|t| unsafe { Pin::new_unchecked(t) })
+ unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
}
enum FinalState<E = ()> {
Pending,
AllDone,
- Error(E)
+ Error(E),
}
/// Future for the [`try_join_all`] function.
@@ -43,9 +41,7 @@ where
F::Error: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("TryJoinAll")
- .field("elems", &self.elems)
- .finish()
+ f.debug_struct("TryJoinAll").field("elems", &self.elems).finish()
}
}
@@ -93,9 +89,9 @@ where
I::Item: TryFuture,
{
let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect();
- assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(TryJoinAll {
- elems: elems.into(),
- })
+ assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
+ TryJoinAll { elems: elems.into() },
+ )
}
impl<F> Future for TryJoinAll<F>
@@ -110,7 +106,7 @@ where
for elem in iter_pin_mut(self.elems.as_mut()) {
match elem.try_poll(cx) {
Poll::Pending => state = FinalState::Pending,
- Poll::Ready(Ok(())) => {},
+ Poll::Ready(Ok(())) => {}
Poll::Ready(Err(e)) => {
state = FinalState::Error(e);
break;
@@ -122,15 +118,14 @@ where
FinalState::Pending => Poll::Pending,
FinalState::AllDone => {
let mut elems = mem::replace(&mut self.elems, Box::pin([]));
- let results = iter_pin_mut(elems.as_mut())
- .map(|e| e.take_output().unwrap())
- .collect();
+ let results =
+ iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
Poll::Ready(Ok(results))
- },
+ }
FinalState::Error(e) => {
let _ = mem::replace(&mut self.elems, Box::pin([]));
Poll::Ready(Err(e))
- },
+ }
}
}
}
diff --git a/src/future/try_maybe_done.rs b/src/future/try_maybe_done.rs
index dfd2900..24044d2 100644
--- a/src/future/try_maybe_done.rs
+++ b/src/future/try_maybe_done.rs
@@ -49,13 +49,13 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> {
#[inline]
pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Ok> {
match &*self {
- Self::Done(_) => {},
+ Self::Done(_) => {}
Self::Future(_) | Self::Gone => return None,
}
unsafe {
match mem::replace(self.get_unchecked_mut(), Self::Gone) {
TryMaybeDone::Done(output) => Some(output),
- _ => unreachable!()
+ _ => unreachable!(),
}
}
}
@@ -76,16 +76,14 @@ impl<Fut: TryFuture> Future for TryMaybeDone<Fut> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe {
match self.as_mut().get_unchecked_mut() {
- TryMaybeDone::Future(f) => {
- match ready!(Pin::new_unchecked(f).try_poll(cx)) {
- Ok(res) => self.set(Self::Done(res)),
- Err(e) => {
- self.set(Self::Gone);
- return Poll::Ready(Err(e));
- }
+ TryMaybeDone::Future(f) => match ready!(Pin::new_unchecked(f).try_poll(cx)) {
+ Ok(res) => self.set(Self::Done(res)),
+ Err(e) => {
+ self.set(Self::Gone);
+ return Poll::Ready(Err(e));
}
},
- TryMaybeDone::Done(_) => {},
+ TryMaybeDone::Done(_) => {}
TryMaybeDone::Gone => panic!("TryMaybeDone polled after value taken"),
}
}
diff --git a/src/future/try_select.rs b/src/future/try_select.rs
index b26eed3..4d0b7ff 100644
--- a/src/future/try_select.rs
+++ b/src/future/try_select.rs
@@ -1,7 +1,7 @@
+use crate::future::{Either, TryFutureExt};
use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
use futures_core::task::{Context, Poll};
-use crate::future::{Either, TryFutureExt};
/// Future for the [`try_select()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
@@ -48,22 +48,23 @@ impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
/// }
/// ```
pub fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B>
- where A: TryFuture + Unpin, B: TryFuture + Unpin
+where
+ A: TryFuture + Unpin,
+ B: TryFuture + Unpin,
{
- super::assert_future::<Result<
- Either<(A::Ok, B), (B::Ok, A)>,
- Either<(A::Error, B), (B::Error, A)>,
- >, _>(TrySelect { inner: Some((future1, future2)) })
+ super::assert_future::<
+ Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>,
+ _,
+ >(TrySelect { inner: Some((future1, future2)) })
}
impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
- where A: TryFuture, B: TryFuture
+where
+ A: TryFuture,
+ B: TryFuture,
{
#[allow(clippy::type_complexity)]
- type Output = Result<
- Either<(A::Ok, B), (B::Ok, A)>,
- Either<(A::Error, B), (B::Error, A)>,
- >;
+ type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
@@ -77,7 +78,7 @@ impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
self.inner = Some((a, b));
Poll::Pending
}
- }
+ },
}
}
}