diff options
author | Joel Galenson <jgalenson@google.com> | 2021-05-19 15:35:43 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-05-19 15:35:43 -0700 |
commit | 7a983027b4201001428e686717aad76b0bc2415a (patch) | |
tree | 1be59395194546600d1021ad77556b503bc5e58b /src | |
parent | 4fa481b6ab8716e64dc7db0fb1f2c533a7685183 (diff) | |
download | futures-util-7a983027b4201001428e686717aad76b0bc2415a.tar.gz |
Upgrade rust/crates/futures-util to 0.3.15
Test: make
Change-Id: Ibac5fc0ece362434ec0af181cfd9860570da7813
Diffstat (limited to 'src')
138 files changed, 1972 insertions, 1894 deletions
diff --git a/src/abortable.rs b/src/abortable.rs new file mode 100644 index 0000000..bb82dd0 --- /dev/null +++ b/src/abortable.rs @@ -0,0 +1,185 @@ +use crate::task::AtomicWaker; +use alloc::sync::Arc; +use core::fmt; +use core::pin::Pin; +use core::sync::atomic::{AtomicBool, Ordering}; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_core::Stream; +use pin_project_lite::pin_project; + +pin_project! { + /// A future/stream which can be remotely short-circuited using an `AbortHandle`. + #[derive(Debug, Clone)] + #[must_use = "futures/streams do nothing unless you poll them"] + pub struct Abortable<T> { + #[pin] + task: T, + inner: Arc<AbortInner>, + } +} + +impl<T> Abortable<T> { + /// Creates a new `Abortable` future/stream using an existing `AbortRegistration`. + /// `AbortRegistration`s can be acquired through `AbortHandle::new`. + /// + /// When `abort` is called on the handle tied to `reg` or if `abort` has + /// already been called, the future/stream will complete immediately without making + /// any further progress. + /// + /// # Examples: + /// + /// Usage with futures: + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::{Abortable, AbortHandle, Aborted}; + /// + /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); + /// let future = Abortable::new(async { 2 }, abort_registration); + /// abort_handle.abort(); + /// assert_eq!(future.await, Err(Aborted)); + /// # }); + /// ``` + /// + /// Usage with streams: + /// + /// ``` + /// # futures::executor::block_on(async { + /// # use futures::future::{Abortable, AbortHandle}; + /// # use futures::stream::{self, StreamExt}; + /// + /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); + /// let mut stream = Abortable::new(stream::iter(vec![1, 2, 3]), abort_registration); + /// abort_handle.abort(); + /// assert_eq!(stream.next().await, None); + /// # }); + /// ``` + pub fn new(task: T, reg: AbortRegistration) -> Self { + Self { task, inner: reg.inner } + } + + /// Checks whether the task has been aborted. Note that all this + /// method indicates is whether [`AbortHandle::abort`] was *called*. + /// This means that it will return `true` even if: + /// * `abort` was called after the task had completed. + /// * `abort` was called while the task was being polled - the task may still be running and + /// will not be stopped until `poll` returns. + pub fn is_aborted(&self) -> bool { + self.inner.aborted.load(Ordering::Relaxed) + } +} + +/// A registration handle for an `Abortable` task. +/// Values of this type can be acquired from `AbortHandle::new` and are used +/// in calls to `Abortable::new`. +#[derive(Debug)] +pub struct AbortRegistration { + inner: Arc<AbortInner>, +} + +/// A handle to an `Abortable` task. +#[derive(Debug, Clone)] +pub struct AbortHandle { + inner: Arc<AbortInner>, +} + +impl AbortHandle { + /// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used + /// to abort a running future or stream. + /// + /// This function is usually paired with a call to [`Abortable::new`]. + pub fn new_pair() -> (Self, AbortRegistration) { + let inner = + Arc::new(AbortInner { waker: AtomicWaker::new(), aborted: AtomicBool::new(false) }); + + (Self { inner: inner.clone() }, AbortRegistration { inner }) + } +} + +// Inner type storing the waker to awaken and a bool indicating that it +// should be aborted. +#[derive(Debug)] +struct AbortInner { + waker: AtomicWaker, + aborted: AtomicBool, +} + +/// Indicator that the `Abortable` task was aborted. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct Aborted; + +impl fmt::Display for Aborted { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "`Abortable` future has been aborted") + } +} + +#[cfg(feature = "std")] +impl std::error::Error for Aborted {} + +impl<T> Abortable<T> { + fn try_poll<I>( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + poll: impl Fn(Pin<&mut T>, &mut Context<'_>) -> Poll<I>, + ) -> Poll<Result<I, Aborted>> { + // Check if the task has been aborted + if self.is_aborted() { + return Poll::Ready(Err(Aborted)); + } + + // attempt to complete the task + if let Poll::Ready(x) = poll(self.as_mut().project().task, cx) { + return Poll::Ready(Ok(x)); + } + + // Register to receive a wakeup if the task is aborted in the future + self.inner.waker.register(cx.waker()); + + // Check to see if the task was aborted between the first check and + // registration. + // Checking with `is_aborted` which uses `Relaxed` is sufficient because + // `register` introduces an `AcqRel` barrier. + if self.is_aborted() { + return Poll::Ready(Err(Aborted)); + } + + Poll::Pending + } +} + +impl<Fut> Future for Abortable<Fut> +where + Fut: Future, +{ + type Output = Result<Fut::Output, Aborted>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.try_poll(cx, |fut, cx| fut.poll(cx)) + } +} + +impl<St> Stream for Abortable<St> +where + St: Stream, +{ + type Item = St::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.try_poll(cx, |stream, cx| stream.poll_next(cx)).map(Result::ok).map(Option::flatten) + } +} + +impl AbortHandle { + /// Abort the `Abortable` stream/future associated with this handle. + /// + /// Notifies the Abortable task associated with this handle that it + /// should abort. Note that if the task is currently being polled on + /// another thread, it will not immediately stop running. Instead, it will + /// continue to run until its poll method returns. + pub fn abort(&self) { + self.inner.aborted.store(true, Ordering::Relaxed); + self.inner.waker.wake(); + } +} diff --git a/src/async_await/join_mod.rs b/src/async_await/join_mod.rs index 965d9fb..c5cdd9b 100644 --- a/src/async_await/join_mod.rs +++ b/src/async_await/join_mod.rs @@ -1,7 +1,5 @@ //! The `join` macro. -use proc_macro_hack::proc_macro_hack; - macro_rules! document_join_macro { ($join:item $try_join:item) => { /// Polls multiple futures simultaneously, returning a tuple @@ -81,12 +79,14 @@ macro_rules! document_join_macro { } } +#[allow(unreachable_pub)] #[doc(hidden)] -#[proc_macro_hack(support_nested, only_hack_old_rustc)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::join_internal; +#[allow(unreachable_pub)] #[doc(hidden)] -#[proc_macro_hack(support_nested, only_hack_old_rustc)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::try_join_internal; document_join_macro! { diff --git a/src/async_await/mod.rs b/src/async_await/mod.rs index bdaed95..5f5d4ac 100644 --- a/src/async_await/mod.rs +++ b/src/async_await/mod.rs @@ -3,8 +3,8 @@ //! This module contains a number of functions and combinators for working //! with `async`/`await` code. -use futures_core::future::{Future, FusedFuture}; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::future::{FusedFuture, Future}; +use futures_core::stream::{FusedStream, Stream}; #[macro_use] mod poll; diff --git a/src/async_await/pending.rs b/src/async_await/pending.rs index e0cf341..5d7a431 100644 --- a/src/async_await/pending.rs +++ b/src/async_await/pending.rs @@ -16,7 +16,7 @@ use futures_core::task::{Context, Poll}; macro_rules! pending { () => { $crate::__private::async_await::pending_once().await - } + }; } #[doc(hidden)] diff --git a/src/async_await/poll.rs b/src/async_await/poll.rs index b5782df..b62f45a 100644 --- a/src/async_await/poll.rs +++ b/src/async_await/poll.rs @@ -17,7 +17,7 @@ use futures_core::task::{Context, Poll}; macro_rules! poll { ($x:expr $(,)?) => { $crate::__private::async_await::poll($x).await - } + }; } #[doc(hidden)] diff --git a/src/async_await/select_mod.rs b/src/async_await/select_mod.rs index 59bca08..37e938d 100644 --- a/src/async_await/select_mod.rs +++ b/src/async_await/select_mod.rs @@ -1,7 +1,5 @@ //! The `select` macro. -use proc_macro_hack::proc_macro_hack; - macro_rules! document_select_macro { // This branch is required for `futures 0.3.1`, from before select_biased was introduced ($select:item) => { @@ -309,12 +307,14 @@ macro_rules! document_select_macro { } #[cfg(feature = "std")] +#[allow(unreachable_pub)] #[doc(hidden)] -#[proc_macro_hack(support_nested, only_hack_old_rustc)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::select_internal; +#[allow(unreachable_pub)] #[doc(hidden)] -#[proc_macro_hack(support_nested, only_hack_old_rustc)] +#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))] pub use futures_macro::select_biased_internal; document_select_macro! { diff --git a/src/compat/compat01as03.rs b/src/compat/compat01as03.rs index bc3aee3..17239a4 100644 --- a/src/compat/compat01as03.rs +++ b/src/compat/compat01as03.rs @@ -1,18 +1,15 @@ use futures_01::executor::{ - spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01, - Spawn as Spawn01, UnsafeNotify as UnsafeNotify01, -}; -use futures_01::{ - Async as Async01, Future as Future01, - Stream as Stream01, + spawn as spawn01, Notify as Notify01, NotifyHandle as NotifyHandle01, Spawn as Spawn01, + UnsafeNotify as UnsafeNotify01, }; +use futures_01::{Async as Async01, Future as Future01, Stream as Stream01}; #[cfg(feature = "sink")] use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01}; -use futures_core::{task as task03, future::Future as Future03, stream::Stream as Stream03}; -use std::pin::Pin; -use std::task::Context; +use futures_core::{future::Future as Future03, stream::Stream as Stream03, task as task03}; #[cfg(feature = "sink")] use futures_sink::Sink as Sink03; +use std::pin::Pin; +use std::task::Context; #[cfg(feature = "io-compat")] #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] @@ -33,9 +30,7 @@ impl<T> Compat01As03<T> { /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite /// object in a futures 0.3-compatible wrapper. pub fn new(object: T) -> Self { - Self { - inner: spawn01(object), - } + Self { inner: spawn01(object) } } fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R { @@ -157,10 +152,7 @@ fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> { impl<Fut: Future01> Future03 for Compat01As03<Fut> { type Output = Result<Fut::Item, Fut::Error>; - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> task03::Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Self::Output> { poll_01_to_03(self.in_notify(cx, Future01::poll)) } } @@ -198,18 +190,10 @@ impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {} impl<S, SinkItem> Compat01As03Sink<S, SinkItem> { /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper. pub fn new(inner: S) -> Self { - Self { - inner: spawn01(inner), - buffer: None, - close_started: false - } + Self { inner: spawn01(inner), buffer: None, close_started: false } } - fn in_notify<R>( - &mut self, - cx: &mut Context<'_>, - f: impl FnOnce(&mut S) -> R, - ) -> R { + fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut S) -> R) -> R { let notify = &WakerToHandle(cx.waker()); self.inner.poll_fn_notify(notify, 0, f) } @@ -256,10 +240,7 @@ where { type Error = S::SinkError; - fn start_send( - mut self: Pin<&mut Self>, - item: SinkItem, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> { debug_assert!(self.buffer.is_none()); self.buffer = Some(item); Ok(()) @@ -289,9 +270,7 @@ where match self.in_notify(cx, |f| match item { Some(i) => match f.start_send(i)? { AsyncSink01::Ready => f.poll_complete().map(|i| (i, None)), - AsyncSink01::NotReady(t) => { - Ok((Async01::NotReady, Some(t))) - } + AsyncSink01::NotReady(t) => Ok((Async01::NotReady, Some(t))), }, None => f.poll_complete().map(|i| (i, None)), })? { @@ -447,29 +426,35 @@ mod io { } } - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> task03::Poll<Result<usize, Error>> - { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> task03::Poll<Result<usize, Error>> { poll_01_to_03(self.in_notify(cx, |x| x.poll_read(buf))) } } impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) - -> task03::Poll<Result<usize, Error>> - { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> task03::Poll<Result<usize, Error>> { poll_01_to_03(self.in_notify(cx, |x| x.poll_write(buf))) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> task03::Poll<Result<(), Error>> - { + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> task03::Poll<Result<(), Error>> { poll_01_to_03(self.in_notify(cx, AsyncWrite01::poll_flush)) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> task03::Poll<Result<(), Error>> - { + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> task03::Poll<Result<(), Error>> { poll_01_to_03(self.in_notify(cx, AsyncWrite01::shutdown)) } } diff --git a/src/compat/compat03as01.rs b/src/compat/compat03as01.rs index 3f1eebb..2573fe7 100644 --- a/src/compat/compat03as01.rs +++ b/src/compat/compat03as01.rs @@ -1,31 +1,19 @@ +use crate::task::{self as task03, ArcWake as ArcWake03, WakerRef}; use futures_01::{ - task as task01, Async as Async01, Future as Future01, Poll as Poll01, - Stream as Stream01, + task as task01, Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01, }; #[cfg(feature = "sink")] -use futures_01::{ - AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01, -}; +use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01}; use futures_core::{ - task::{RawWaker, RawWakerVTable}, future::TryFuture as TryFuture03, stream::TryStream as TryStream03, + task::{RawWaker, RawWakerVTable}, }; #[cfg(feature = "sink")] use futures_sink::Sink as Sink03; -use crate::task::{ - self as task03, - ArcWake as ArcWake03, - WakerRef, -}; #[cfg(feature = "sink")] use std::marker::PhantomData; -use std::{ - mem, - pin::Pin, - sync::Arc, - task::Context, -}; +use std::{mem, pin::Pin, sync::Arc, task::Context}; /// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture) or /// [`TryStream`](futures_core::stream::TryStream) into a futures 0.1 @@ -80,10 +68,7 @@ impl<T> Compat<T> { impl<T, Item> CompatSink<T, Item> { /// Creates a new [`CompatSink`]. pub fn new(inner: T) -> Self { - Self { - inner, - _phantom: PhantomData, - } + Self { inner, _phantom: PhantomData } } /// Get a reference to 0.3 Sink contained within. @@ -102,9 +87,7 @@ impl<T, Item> CompatSink<T, Item> { } } -fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) - -> Result<Async01<T>, E> -{ +fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E> { match x? { task03::Poll::Ready(t) => Ok(Async01::Ready(t)), task03::Poll::Pending => Ok(Async01::NotReady), @@ -147,17 +130,10 @@ where type SinkItem = Item; type SinkError = T::Error; - fn start_send( - &mut self, - item: Self::SinkItem, - ) -> StartSend01<Self::SinkItem, Self::SinkError> { - with_sink_context(self, |mut inner, cx| { - match inner.as_mut().poll_ready(cx)? { - task03::Poll::Ready(()) => { - inner.start_send(item).map(|()| AsyncSink01::Ready) - } - task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)), - } + fn start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError> { + with_sink_context(self, |mut inner, cx| match inner.as_mut().poll_ready(cx)? { + task03::Poll::Ready(()) => inner.start_send(item).map(|()| AsyncSink01::Ready), + task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)), }) } @@ -190,9 +166,9 @@ impl Current { // Lazily create the `Arc` only when the waker is actually cloned. // FIXME: remove `transmute` when a `Waker` -> `RawWaker` conversion // function is landed in `core`. - mem::transmute::<task03::Waker, RawWaker>( - task03::waker(Arc::new(ptr_to_current(ptr).clone())) - ) + mem::transmute::<task03::Waker, RawWaker>(task03::waker(Arc::new( + ptr_to_current(ptr).clone(), + ))) } unsafe fn drop(_: *const ()) {} unsafe fn wake(ptr: *const ()) { @@ -243,9 +219,7 @@ mod io { use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03}; use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01}; - fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) - -> Result<T, std::io::Error> - { + fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error> { match x { task03::Poll::Ready(Ok(t)) => Ok(t), task03::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()), diff --git a/src/compat/executor.rs b/src/compat/executor.rs index 82cb496..e25705b 100644 --- a/src/compat/executor.rs +++ b/src/compat/executor.rs @@ -66,9 +66,7 @@ where fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError03> { let future = future.unit_error().compat(); - self.executor01 - .execute(future) - .map_err(|_| SpawnError03::shutdown()) + self.executor01.execute(future).map_err(|_| SpawnError03::shutdown()) } } diff --git a/src/compat/mod.rs b/src/compat/mod.rs index c5edcc5..4812803 100644 --- a/src/compat/mod.rs +++ b/src/compat/mod.rs @@ -4,16 +4,16 @@ //! library is activated. mod executor; -pub use self::executor::{Executor01CompatExt, Executor01Future, Executor01As03}; +pub use self::executor::{Executor01As03, Executor01CompatExt, Executor01Future}; mod compat01as03; +#[cfg(feature = "io-compat")] +#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] +pub use self::compat01as03::{AsyncRead01CompatExt, AsyncWrite01CompatExt}; pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt}; #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub use self::compat01as03::{Compat01As03Sink, Sink01CompatExt}; -#[cfg(feature = "io-compat")] -#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] -pub use self::compat01as03::{AsyncRead01CompatExt, AsyncWrite01CompatExt}; mod compat03as01; pub use self::compat03as01::Compat; @@ -1,5 +1,5 @@ -use core::marker::PhantomData; use core::fmt::{self, Debug}; +use core::marker::PhantomData; pub trait FnOnce1<A> { type Output; @@ -8,7 +8,7 @@ pub trait FnOnce1<A> { impl<T, A, R> FnOnce1<A> for T where - T: FnOnce(A) -> R + T: FnOnce(A) -> R, { type Output = R; fn call_once(self, arg: A) -> R { @@ -22,7 +22,7 @@ pub trait FnMut1<A>: FnOnce1<A> { impl<T, A, R> FnMut1<A> for T where - T: FnMut(A) -> R + T: FnMut(A) -> R, { fn call_mut(&mut self, arg: A) -> R { self(arg) @@ -37,7 +37,7 @@ pub trait Fn1<A>: FnMut1<A> { impl<T, A, R> Fn1<A> for T where - T: Fn(A) -> R + T: Fn(A) -> R, { fn call(&self, arg: A) -> R { self(arg) @@ -143,7 +143,7 @@ pub struct InspectFn<F>(F); #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 impl<F, A> FnOnce1<A> for InspectFn<F> where - F: for<'a> FnOnce1<&'a A, Output=()>, + F: for<'a> FnOnce1<&'a A, Output = ()>, { type Output = A; fn call_once(self, arg: A) -> Self::Output { @@ -154,7 +154,7 @@ where #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 impl<F, A> FnMut1<A> for InspectFn<F> where - F: for<'a> FnMut1<&'a A, Output=()>, + F: for<'a> FnMut1<&'a A, Output = ()>, { fn call_mut(&mut self, arg: A) -> Self::Output { self.0.call_mut(&arg); @@ -164,7 +164,7 @@ where #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 impl<F, A> Fn1<A> for InspectFn<F> where - F: for<'a> Fn1<&'a A, Output=()>, + F: for<'a> Fn1<&'a A, Output = ()>, { fn call(&self, arg: A) -> Self::Output { self.0.call(&arg); @@ -244,27 +244,33 @@ pub struct InspectOkFn<F>(F); impl<'a, F, T, E> FnOnce1<&'a Result<T, E>> for InspectOkFn<F> where - F: FnOnce1<&'a T, Output=()> + F: FnOnce1<&'a T, Output = ()>, { type Output = (); fn call_once(self, arg: &'a Result<T, E>) -> Self::Output { - if let Ok(x) = arg { self.0.call_once(x) } + if let Ok(x) = arg { + self.0.call_once(x) + } } } impl<'a, F, T, E> FnMut1<&'a Result<T, E>> for InspectOkFn<F> where - F: FnMut1<&'a T, Output=()>, + F: FnMut1<&'a T, Output = ()>, { fn call_mut(&mut self, arg: &'a Result<T, E>) -> Self::Output { - if let Ok(x) = arg { self.0.call_mut(x) } + if let Ok(x) = arg { + self.0.call_mut(x) + } } } impl<'a, F, T, E> Fn1<&'a Result<T, E>> for InspectOkFn<F> where - F: Fn1<&'a T, Output=()>, + F: Fn1<&'a T, Output = ()>, { fn call(&self, arg: &'a Result<T, E>) -> Self::Output { - if let Ok(x) = arg { self.0.call(x) } + if let Ok(x) = arg { + self.0.call(x) + } } } pub(crate) fn inspect_ok_fn<F>(f: F) -> InspectOkFn<F> { @@ -276,27 +282,33 @@ pub struct InspectErrFn<F>(F); impl<'a, F, T, E> FnOnce1<&'a Result<T, E>> for InspectErrFn<F> where - F: FnOnce1<&'a E, Output=()> + F: FnOnce1<&'a E, Output = ()>, { type Output = (); fn call_once(self, arg: &'a Result<T, E>) -> Self::Output { - if let Err(x) = arg { self.0.call_once(x) } + if let Err(x) = arg { + self.0.call_once(x) + } } } impl<'a, F, T, E> FnMut1<&'a Result<T, E>> for InspectErrFn<F> where - F: FnMut1<&'a E, Output=()>, + F: FnMut1<&'a E, Output = ()>, { fn call_mut(&mut self, arg: &'a Result<T, E>) -> Self::Output { - if let Err(x) = arg { self.0.call_mut(x) } + if let Err(x) = arg { + self.0.call_mut(x) + } } } impl<'a, F, T, E> Fn1<&'a Result<T, E>> for InspectErrFn<F> where - F: Fn1<&'a E, Output=()>, + F: Fn1<&'a E, Output = ()>, { fn call(&self, arg: &'a Result<T, E>) -> Self::Output { - if let Err(x) = arg { self.0.call(x) } + if let Err(x) = arg { + self.0.call(x) + } } } pub(crate) fn inspect_err_fn<F>(f: F) -> InspectErrFn<F> { @@ -313,7 +325,7 @@ pub struct UnwrapOrElseFn<F>(F); impl<F, T, E> FnOnce1<Result<T, E>> for UnwrapOrElseFn<F> where - F: FnOnce1<E, Output=T>, + F: FnOnce1<E, Output = T>, { type Output = T; fn call_once(self, arg: Result<T, E>) -> Self::Output { @@ -322,7 +334,7 @@ where } impl<F, T, E> FnMut1<Result<T, E>> for UnwrapOrElseFn<F> where - F: FnMut1<E, Output=T>, + F: FnMut1<E, Output = T>, { fn call_mut(&mut self, arg: Result<T, E>) -> Self::Output { arg.unwrap_or_else(|x| self.0.call_mut(x)) @@ -330,7 +342,7 @@ where } impl<F, T, E> Fn1<Result<T, E>> for UnwrapOrElseFn<F> where - F: Fn1<E, Output=T>, + F: Fn1<E, Output = T>, { fn call(&self, arg: Result<T, E>) -> Self::Output { arg.unwrap_or_else(|x| self.0.call(x)) @@ -347,7 +359,10 @@ impl<T> Default for IntoFn<T> { Self(PhantomData) } } -impl<A, T> FnOnce1<A> for IntoFn<T> where A: Into<T> { +impl<A, T> FnOnce1<A> for IntoFn<T> +where + A: Into<T>, +{ type Output = T; fn call_once(self, arg: A) -> Self::Output { arg.into() diff --git a/src/future/abortable.rs b/src/future/abortable.rs index 3f2e5a0..d017ab7 100644 --- a/src/future/abortable.rs +++ b/src/future/abortable.rs @@ -1,110 +1,8 @@ use super::assert_future; -use crate::task::AtomicWaker; +use crate::future::{AbortHandle, Abortable, Aborted}; use futures_core::future::Future; -use futures_core::task::{Context, Poll}; -use core::fmt; -use core::pin::Pin; -use core::sync::atomic::{AtomicBool, Ordering}; -use alloc::sync::Arc; -use pin_project_lite::pin_project; -pin_project! { - /// A future which can be remotely short-circuited using an `AbortHandle`. - #[derive(Debug, Clone)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct Abortable<Fut> { - #[pin] - future: Fut, - inner: Arc<AbortInner>, - } -} - -impl<Fut> Abortable<Fut> where Fut: Future { - /// Creates a new `Abortable` future using an existing `AbortRegistration`. - /// `AbortRegistration`s can be acquired through `AbortHandle::new`. - /// - /// When `abort` is called on the handle tied to `reg` or if `abort` has - /// already been called, the future will complete immediately without making - /// any further progress. - /// - /// Example: - /// - /// ``` - /// # futures::executor::block_on(async { - /// use futures::future::{Abortable, AbortHandle, Aborted}; - /// - /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); - /// let future = Abortable::new(async { 2 }, abort_registration); - /// abort_handle.abort(); - /// assert_eq!(future.await, Err(Aborted)); - /// # }); - /// ``` - pub fn new(future: Fut, reg: AbortRegistration) -> Self { - assert_future::<Result<Fut::Output, Aborted>, _>(Self { - future, - inner: reg.inner, - }) - } -} - -/// A registration handle for a `Abortable` future. -/// Values of this type can be acquired from `AbortHandle::new` and are used -/// in calls to `Abortable::new`. -#[derive(Debug)] -pub struct AbortRegistration { - inner: Arc<AbortInner>, -} - -/// A handle to a `Abortable` future. -#[derive(Debug, Clone)] -pub struct AbortHandle { - inner: Arc<AbortInner>, -} - -impl AbortHandle { - /// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used - /// to abort a running future. - /// - /// This function is usually paired with a call to `Abortable::new`. - /// - /// Example: - /// - /// ``` - /// # futures::executor::block_on(async { - /// use futures::future::{Abortable, AbortHandle, Aborted}; - /// - /// let (abort_handle, abort_registration) = AbortHandle::new_pair(); - /// let future = Abortable::new(async { 2 }, abort_registration); - /// abort_handle.abort(); - /// assert_eq!(future.await, Err(Aborted)); - /// # }); - /// ``` - pub fn new_pair() -> (Self, AbortRegistration) { - let inner = Arc::new(AbortInner { - waker: AtomicWaker::new(), - cancel: AtomicBool::new(false), - }); - - ( - Self { - inner: inner.clone(), - }, - AbortRegistration { - inner, - }, - ) - } -} - -// Inner type storing the waker to awaken and a bool indicating that it -// should be cancelled. -#[derive(Debug)] -struct AbortInner { - waker: AtomicWaker, - cancel: AtomicBool, -} - -/// Creates a new `Abortable` future and a `AbortHandle` which can be used to stop it. +/// Creates a new `Abortable` future and an `AbortHandle` which can be used to stop it. /// /// This function is a convenient (but less flexible) alternative to calling /// `AbortHandle::new` and `Abortable::new` manually. @@ -112,66 +10,10 @@ struct AbortInner { /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. pub fn abortable<Fut>(future: Fut) -> (Abortable<Fut>, AbortHandle) - where Fut: Future +where + Fut: Future, { let (handle, reg) = AbortHandle::new_pair(); - ( - Abortable::new(future, reg), - handle, - ) -} - -/// Indicator that the `Abortable` future was aborted. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub struct Aborted; - -impl fmt::Display for Aborted { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "`Abortable` future has been aborted") - } -} - -#[cfg(feature = "std")] -impl std::error::Error for Aborted {} - -impl<Fut> Future for Abortable<Fut> where Fut: Future { - type Output = Result<Fut::Output, Aborted>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - // Check if the future has been aborted - if self.inner.cancel.load(Ordering::Relaxed) { - return Poll::Ready(Err(Aborted)) - } - - // attempt to complete the future - if let Poll::Ready(x) = self.as_mut().project().future.poll(cx) { - return Poll::Ready(Ok(x)) - } - - // Register to receive a wakeup if the future is aborted in the... future - self.inner.waker.register(cx.waker()); - - // Check to see if the future was aborted between the first check and - // registration. - // Checking with `Relaxed` is sufficient because `register` introduces an - // `AcqRel` barrier. - if self.inner.cancel.load(Ordering::Relaxed) { - return Poll::Ready(Err(Aborted)) - } - - Poll::Pending - } -} - -impl AbortHandle { - /// Abort the `Abortable` future associated with this handle. - /// - /// Notifies the Abortable future associated with this handle that it - /// should abort. Note that if the future is currently being polled on - /// another thread, it will not immediately stop running. Instead, it will - /// continue to run until its poll method returns. - pub fn abort(&self) { - self.inner.cancel.store(true, Ordering::Relaxed); - self.inner.waker.wake(); - } + let abortable = assert_future::<Result<Fut::Output, Aborted>, _>(Abortable::new(future, reg)); + (abortable, handle) } diff --git a/src/future/either.rs b/src/future/either.rs index 5f5b614..35650da 100644 --- a/src/future/either.rs +++ b/src/future/either.rs @@ -5,8 +5,25 @@ use futures_core::stream::{FusedStream, Stream}; #[cfg(feature = "sink")] use futures_sink::Sink; -/// Combines two different futures, streams, or sinks having the same associated types into a single -/// type. +/// Combines two different futures, streams, or sinks having the same associated types into a single type. +/// +/// This is useful when conditionally choosing between two distinct future types: +/// +/// ```rust +/// use futures::future::Either; +/// +/// # futures::executor::block_on(async { +/// let cond = true; +/// +/// let fut = if cond { +/// Either::Left(async move { 12 }) +/// } else { +/// Either::Right(async move { 44 }) +/// }; +/// +/// assert_eq!(fut.await, 12); +/// # }) +/// ``` #[derive(Debug, Clone)] pub enum Either<A, B> { /// First branch of the type diff --git a/src/future/future/catch_unwind.rs b/src/future/future/catch_unwind.rs index 3f16577..0e09d6e 100644 --- a/src/future/future/catch_unwind.rs +++ b/src/future/future/catch_unwind.rs @@ -1,6 +1,6 @@ use core::any::Any; use core::pin::Pin; -use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; +use std::panic::{catch_unwind, AssertUnwindSafe, UnwindSafe}; use futures_core::future::Future; use futures_core::task::{Context, Poll}; @@ -16,14 +16,18 @@ pin_project! { } } -impl<Fut> CatchUnwind<Fut> where Fut: Future + UnwindSafe { +impl<Fut> CatchUnwind<Fut> +where + Fut: Future + UnwindSafe, +{ pub(super) fn new(future: Fut) -> Self { Self { future } } } impl<Fut> Future for CatchUnwind<Fut> - where Fut: Future + UnwindSafe, +where + Fut: Future + UnwindSafe, { type Output = Result<Fut::Output, Box<dyn Any + Send>>; diff --git a/src/future/future/flatten.rs b/src/future/future/flatten.rs index 0c48a4f..bd767af 100644 --- a/src/future/future/flatten.rs +++ b/src/future/future/flatten.rs @@ -2,9 +2,9 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; pin_project! { @@ -24,8 +24,9 @@ impl<Fut1, Fut2> Flatten<Fut1, Fut2> { } impl<Fut> FusedFuture for Flatten<Fut, Fut::Output> - where Fut: Future, - Fut::Output: Future, +where + Fut: Future, + Fut::Output: Future, { fn is_terminated(&self) -> bool { match self { @@ -36,8 +37,9 @@ impl<Fut> FusedFuture for Flatten<Fut, Fut::Output> } impl<Fut> Future for Flatten<Fut, Fut::Output> - where Fut: Future, - Fut::Output: Future, +where + Fut: Future, + Fut::Output: Future, { type Output = <Fut::Output as Future>::Output; @@ -47,12 +49,12 @@ impl<Fut> Future for Flatten<Fut, Fut::Output> FlattenProj::First { f } => { let f = ready!(f.poll(cx)); self.set(Self::Second { f }); - }, + } FlattenProj::Second { f } => { let output = ready!(f.poll(cx)); self.set(Self::Empty); break output; - }, + } FlattenProj::Empty => panic!("Flatten polled after completion"), } }) @@ -60,8 +62,9 @@ impl<Fut> Future for Flatten<Fut, Fut::Output> } impl<Fut> FusedStream for Flatten<Fut, Fut::Output> - where Fut: Future, - Fut::Output: Stream, +where + Fut: Future, + Fut::Output: Stream, { fn is_terminated(&self) -> bool { match self { @@ -72,32 +75,32 @@ impl<Fut> FusedStream for Flatten<Fut, Fut::Output> } impl<Fut> Stream for Flatten<Fut, Fut::Output> - where Fut: Future, - Fut::Output: Stream, +where + Fut: Future, + Fut::Output: Stream, { type Item = <Fut::Output as Stream>::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Ready(loop { match self.as_mut().project() { - FlattenProj::First { f } => { + FlattenProj::First { f } => { let f = ready!(f.poll(cx)); self.set(Self::Second { f }); - }, + } FlattenProj::Second { f } => { let output = ready!(f.poll_next(cx)); if output.is_none() { self.set(Self::Empty); } break output; - }, + } FlattenProj::Empty => break None, } }) } } - #[cfg(feature = "sink")] impl<Fut, Item> Sink<Item> for Flatten<Fut, Fut::Output> where @@ -106,19 +109,16 @@ where { type Error = <Fut::Output as Sink<Item>>::Error; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(loop { match self.as_mut().project() { FlattenProj::First { f } => { let f = ready!(f.poll(cx)); self.set(Self::Second { f }); - }, + } FlattenProj::Second { f } => { break ready!(f.poll_ready(cx)); - }, + } FlattenProj::Empty => panic!("poll_ready called after eof"), } }) @@ -140,10 +140,7 @@ where } } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { let res = match self.as_mut().project() { FlattenProj::Second { f } => f.poll_close(cx), _ => Poll::Ready(Ok(())), diff --git a/src/future/future/fuse.rs b/src/future/future/fuse.rs index f4284ba..597aec1 100644 --- a/src/future/future/fuse.rs +++ b/src/future/future/fuse.rs @@ -1,5 +1,5 @@ use core::pin::Pin; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::ready; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -86,7 +86,7 @@ impl<Fut: Future> Future for Fuse<Fut> { let output = ready!(fut.poll(cx)); self.project().inner.set(None); output - }, + } None => return Poll::Pending, }) } diff --git a/src/future/future/remote_handle.rs b/src/future/future/remote_handle.rs index 0d33ea5..1358902 100644 --- a/src/future/future/remote_handle.rs +++ b/src/future/future/remote_handle.rs @@ -1,23 +1,23 @@ use { crate::future::{CatchUnwind, FutureExt}, - futures_channel::oneshot::{self, Sender, Receiver}, + futures_channel::oneshot::{self, Receiver, Sender}, futures_core::{ future::Future, - task::{Context, Poll}, ready, + task::{Context, Poll}, }, + pin_project_lite::pin_project, std::{ any::Any, fmt, panic::{self, AssertUnwindSafe}, pin::Pin, sync::{ - Arc, atomic::{AtomicBool, Ordering}, + Arc, }, thread, }, - pin_project_lite::pin_project, }; /// The handle to a remote future returned by @@ -36,7 +36,7 @@ use { /// must be careful with regard to unwind safety because the thread in which the future /// is polled will keep running after the panic and the thread running the [RemoteHandle] /// will unwind. -#[must_use = "futures do nothing unless you `.await` or poll them"] +#[must_use = "dropping a remote handle cancels the underlying future"] #[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] pub struct RemoteHandle<T> { @@ -85,9 +85,7 @@ pin_project! { impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("Remote") - .field(&self.future) - .finish() + f.debug_tuple("Remote").field(&self.future).finish() } } diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs index 74311a0..9b31932 100644 --- a/src/future/future/shared.rs +++ b/src/future/future/shared.rs @@ -29,6 +29,12 @@ struct Notifier { /// A weak reference to a [`Shared`] that can be upgraded much like an `Arc`. pub struct WeakShared<Fut: Future>(Weak<Inner<Fut>>); +impl<Fut: Future> Clone for WeakShared<Fut> { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + // The future itself is polled behind the `Arc`, so it won't be moved // when `Shared` is moved. impl<Fut: Future> Unpin for Shared<Fut> {} @@ -90,10 +96,7 @@ impl<Fut: Future> Shared<Fut> { }), }; - Self { - inner: Some(Arc::new(inner)), - waker_key: NULL_WAKER_KEY, - } + Self { inner: Some(Arc::new(inner)), waker_key: NULL_WAKER_KEY } } } @@ -223,10 +226,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = &mut *self; - let inner = this - .inner - .take() - .expect("Shared future polled again after completion"); + let inner = this.inner.take().expect("Shared future polled again after completion"); // Fast path for when the wrapped future has already completed if inner.notifier.state.load(Acquire) == COMPLETE { @@ -286,11 +286,7 @@ where match future.poll(&mut cx) { Poll::Pending => { - if inner - .notifier - .state - .compare_exchange(POLLING, IDLE, SeqCst, SeqCst) - .is_ok() + if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok() { // Success drop(_reset); @@ -330,10 +326,7 @@ where Fut: Future, { fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - waker_key: NULL_WAKER_KEY, - } + Self { inner: self.inner.clone(), waker_key: NULL_WAKER_KEY } } } @@ -367,16 +360,12 @@ impl ArcWake for Notifier { } } -impl<Fut: Future> WeakShared<Fut> -{ +impl<Fut: Future> WeakShared<Fut> { /// Attempts to upgrade this [`WeakShared`] into a [`Shared`]. /// /// Returns [`None`] if all clones of the [`Shared`] have been dropped or polled /// to completion. pub fn upgrade(&self) -> Option<Shared<Fut>> { - Some(Shared { - inner: Some(self.0.upgrade()?), - waker_key: NULL_WAKER_KEY, - }) + Some(Shared { inner: Some(self.0.upgrade()?), waker_key: NULL_WAKER_KEY }) } } diff --git a/src/future/join.rs b/src/future/join.rs index a818343..740ffbc 100644 --- a/src/future/join.rs +++ b/src/future/join.rs @@ -213,14 +213,5 @@ where Fut5: Future, { let f = Join5::new(future1, future2, future3, future4, future5); - assert_future::< - ( - Fut1::Output, - Fut2::Output, - Fut3::Output, - Fut4::Output, - Fut5::Output, - ), - _, - >(f) + assert_future::<(Fut1::Output, Fut2::Output, Fut3::Output, Fut4::Output, Fut5::Output), _>(f) } diff --git a/src/future/join_all.rs b/src/future/join_all.rs index 7ccf869..427e71c 100644 --- a/src/future/join_all.rs +++ b/src/future/join_all.rs @@ -1,24 +1,22 @@ //! Definition of the `JoinAll` combinator, waiting for all of a list of futures //! to finish. +use alloc::boxed::Box; +use alloc::vec::Vec; use core::fmt; use core::future::Future; use core::iter::FromIterator; use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use alloc::boxed::Box; -use alloc::vec::Vec; -use super::{MaybeDone, assert_future}; +use super::{assert_future, MaybeDone}; fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. - unsafe { slice.get_unchecked_mut() } - .iter_mut() - .map(|t| unsafe { Pin::new_unchecked(t) }) + unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) } /// Future for the [`join_all`] function. @@ -36,9 +34,7 @@ where F::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("JoinAll") - .field("elems", &self.elems) - .finish() + f.debug_struct("JoinAll").field("elems", &self.elems).finish() } } @@ -105,9 +101,7 @@ where if all_done { let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let result = iter_pin_mut(elems.as_mut()) - .map(|e| e.take_output().unwrap()) - .collect(); + let result = iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); Poll::Ready(result) } else { Poll::Pending diff --git a/src/future/lazy.rs b/src/future/lazy.rs index 42812d3..e9a8cf2 100644 --- a/src/future/lazy.rs +++ b/src/future/lazy.rs @@ -7,7 +7,7 @@ use futures_core::task::{Context, Poll}; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Lazy<F> { - f: Option<F> + f: Option<F>, } // safe because we never generate `Pin<&mut F>` @@ -33,19 +33,24 @@ impl<F> Unpin for Lazy<F> {} /// # }); /// ``` pub fn lazy<F, R>(f: F) -> Lazy<F> - where F: FnOnce(&mut Context<'_>) -> R, +where + F: FnOnce(&mut Context<'_>) -> R, { assert_future::<R, _>(Lazy { f: Some(f) }) } impl<F, R> FusedFuture for Lazy<F> - where F: FnOnce(&mut Context<'_>) -> R, +where + F: FnOnce(&mut Context<'_>) -> R, { - fn is_terminated(&self) -> bool { self.f.is_none() } + fn is_terminated(&self) -> bool { + self.f.is_none() + } } impl<F, R> Future for Lazy<F> - where F: FnOnce(&mut Context<'_>) -> R, +where + F: FnOnce(&mut Context<'_>) -> R, { type Output = R; diff --git a/src/future/mod.rs b/src/future/mod.rs index 84e457c..7a63e5f 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -21,7 +21,7 @@ pub use futures_task::{FutureObj, LocalFutureObj, UnsafeFutureObj}; #[allow(clippy::module_inception)] mod future; pub use self::future::{ - Flatten, Fuse, FutureExt, Inspect, IntoStream, Map, NeverError, Then, UnitError, MapInto, + Flatten, Fuse, FutureExt, Inspect, IntoStream, Map, MapInto, NeverError, Then, UnitError, }; #[deprecated(note = "This is now an alias for [Flatten](Flatten)")] @@ -40,8 +40,8 @@ pub use self::future::{Shared, WeakShared}; mod try_future; pub use self::try_future::{ - AndThen, ErrInto, OkInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, OrElse, TryFlattenStream, - TryFutureExt, UnwrapOrElse, MapOkOrElse, TryFlatten, + AndThen, ErrInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, MapOkOrElse, OkInto, + OrElse, TryFlatten, TryFlattenStream, TryFutureExt, UnwrapOrElse, }; #[cfg(feature = "sink")] @@ -112,7 +112,9 @@ cfg_target_has_atomic! { #[cfg(feature = "alloc")] mod abortable; #[cfg(feature = "alloc")] - pub use self::abortable::{abortable, Abortable, AbortHandle, AbortRegistration, Aborted}; + pub use crate::abortable::{Abortable, AbortHandle, AbortRegistration, Aborted}; + #[cfg(feature = "alloc")] + pub use abortable::abortable; } // Just a helper function to ensure the futures we're returning all have the diff --git a/src/future/option.rs b/src/future/option.rs index 85939d6..426fe50 100644 --- a/src/future/option.rs +++ b/src/future/option.rs @@ -1,7 +1,7 @@ //! Definition of the `Option` (optional step) combinator use core::pin::Pin; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -34,10 +34,7 @@ pin_project! { impl<F: Future> Future for OptionFuture<F> { type Output = Option<F::Output>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { match self.project().inner.as_pin_mut() { Some(x) => x.poll(cx).map(Some), None => Poll::Ready(None), diff --git a/src/future/pending.rs b/src/future/pending.rs index 4311b9a..92c78d5 100644 --- a/src/future/pending.rs +++ b/src/future/pending.rs @@ -34,9 +34,7 @@ impl<T> FusedFuture for Pending<T> { /// # }); /// ``` pub fn pending<T>() -> Pending<T> { - assert_future::<T, _>(Pending { - _data: marker::PhantomData, - }) + assert_future::<T, _>(Pending { _data: marker::PhantomData }) } impl<T> Future for Pending<T> { @@ -47,8 +45,7 @@ impl<T> Future for Pending<T> { } } -impl<T> Unpin for Pending<T> { -} +impl<T> Unpin for Pending<T> {} impl<T> Clone for Pending<T> { fn clone(&self) -> Self { diff --git a/src/future/poll_fn.rs b/src/future/poll_fn.rs index 6ac1ab8..1931157 100644 --- a/src/future/poll_fn.rs +++ b/src/future/poll_fn.rs @@ -35,7 +35,7 @@ impl<F> Unpin for PollFn<F> {} /// ``` pub fn poll_fn<T, F>(f: F) -> PollFn<F> where - F: FnMut(&mut Context<'_>) -> Poll<T> + F: FnMut(&mut Context<'_>) -> Poll<T>, { assert_future::<T, _>(PollFn { f }) } @@ -47,7 +47,8 @@ impl<F> fmt::Debug for PollFn<F> { } impl<T, F> Future for PollFn<F> - where F: FnMut(&mut Context<'_>) -> Poll<T>, +where + F: FnMut(&mut Context<'_>) -> Poll<T>, { type Output = T; diff --git a/src/future/select.rs b/src/future/select.rs index 043ed17..bd44f20 100644 --- a/src/future/select.rs +++ b/src/future/select.rs @@ -1,8 +1,8 @@ use super::assert_future; +use crate::future::{Either, FutureExt}; use core::pin::Pin; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll}; -use crate::future::{Either, FutureExt}; /// Future for the [`select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] @@ -37,13 +37,13 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {} /// future::Either, /// future::self, /// }; -/// +/// /// // These two futures have different types even though their outputs have the same type. /// let future1 = async { /// future::pending::<()>().await; // will never finish /// 1 /// }; -/// let future2 = async { +/// let future2 = async { /// future::ready(2).await /// }; /// @@ -82,9 +82,13 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {} /// } /// ``` pub fn select<A, B>(future1: A, future2: B) -> Select<A, B> - where A: Future + Unpin, B: Future + Unpin +where + A: Future + Unpin, + B: Future + Unpin, { - assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select { inner: Some((future1, future2)) }) + assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select { + inner: Some((future1, future2)), + }) } impl<A, B> Future for Select<A, B> @@ -104,7 +108,7 @@ where self.inner = Some((a, b)); Poll::Pending } - } + }, } } } diff --git a/src/future/select_all.rs b/src/future/select_all.rs index 0db90a7..106e508 100644 --- a/src/future/select_all.rs +++ b/src/future/select_all.rs @@ -1,9 +1,9 @@ use super::assert_future; use crate::future::FutureExt; +use alloc::vec::Vec; use core::iter::FromIterator; use core::mem; use core::pin::Pin; -use alloc::vec::Vec; use futures_core::future::Future; use futures_core::task::{Context, Poll}; @@ -32,25 +32,29 @@ impl<Fut: Unpin> Unpin for SelectAll<Fut> {} /// /// This function will panic if the iterator specified contains no items. pub fn select_all<I>(iter: I) -> SelectAll<I::Item> - where I: IntoIterator, - I::Item: Future + Unpin, +where + I: IntoIterator, + I::Item: Future + Unpin, { - let ret = SelectAll { - inner: iter.into_iter().collect() - }; + let ret = SelectAll { inner: iter.into_iter().collect() }; assert!(!ret.inner.is_empty()); assert_future::<(<I::Item as Future>::Output, usize, Vec<I::Item>), _>(ret) } +impl<Fut> SelectAll<Fut> { + /// Consumes this combinator, returning the underlying futures. + pub fn into_inner(self) -> Vec<Fut> { + self.inner + } +} + impl<Fut: Future + Unpin> Future for SelectAll<Fut> { type Output = (Fut::Output, usize, Vec<Fut>); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| { - match f.poll_unpin(cx) { - Poll::Pending => None, - Poll::Ready(e) => Some((i, e)), - } + let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.poll_unpin(cx) { + Poll::Pending => None, + Poll::Ready(e) => Some((i, e)), }); match item { Some((idx, res)) => { diff --git a/src/future/select_ok.rs b/src/future/select_ok.rs index 52d393c..0ad83c6 100644 --- a/src/future/select_ok.rs +++ b/src/future/select_ok.rs @@ -1,9 +1,9 @@ use super::assert_future; use crate::future::TryFutureExt; +use alloc::vec::Vec; use core::iter::FromIterator; use core::mem; use core::pin::Pin; -use alloc::vec::Vec; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; @@ -30,14 +30,16 @@ impl<Fut: Unpin> Unpin for SelectOk<Fut> {} /// /// This function will panic if the iterator specified contains no items. pub fn select_ok<I>(iter: I) -> SelectOk<I::Item> - where I: IntoIterator, - I::Item: TryFuture + Unpin, +where + I: IntoIterator, + I::Item: TryFuture + Unpin, { - let ret = SelectOk { - inner: iter.into_iter().collect() - }; + let ret = SelectOk { inner: iter.into_iter().collect() }; assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); - assert_future::<Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>, _>(ret) + assert_future::< + Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>, + _, + >(ret) } impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { @@ -46,12 +48,11 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // loop until we've either exhausted all errors, a success was hit, or nothing is ready loop { - let item = self.inner.iter_mut().enumerate().find_map(|(i, f)| { - match f.try_poll_unpin(cx) { + let item = + self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) { Poll::Pending => None, Poll::Ready(e) => Some((i, e)), - } - }); + }); match item { Some((idx, res)) => { // always remove Ok or Err, if it's not the last Err continue looping @@ -59,18 +60,18 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { match res { Ok(e) => { let rest = mem::replace(&mut self.inner, Vec::new()); - return Poll::Ready(Ok((e, rest))) + return Poll::Ready(Ok((e, rest))); } Err(e) => { if self.inner.is_empty() { - return Poll::Ready(Err(e)) + return Poll::Ready(Err(e)); } } } } None => { // based on the filter above, nothing is ready, return - return Poll::Pending + return Poll::Pending; } } } diff --git a/src/future/try_future/into_future.rs b/src/future/try_future/into_future.rs index e88d603..9f093d0 100644 --- a/src/future/try_future/into_future.rs +++ b/src/future/try_future/into_future.rs @@ -21,17 +21,16 @@ impl<Fut> IntoFuture<Fut> { } impl<Fut: TryFuture + FusedFuture> FusedFuture for IntoFuture<Fut> { - fn is_terminated(&self) -> bool { self.future.is_terminated() } + fn is_terminated(&self) -> bool { + self.future.is_terminated() + } } impl<Fut: TryFuture> Future for IntoFuture<Fut> { type Output = Result<Fut::Ok, Fut::Error>; #[inline] - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.project().future.try_poll(cx) } } diff --git a/src/future/try_future/try_flatten.rs b/src/future/try_future/try_flatten.rs index 5241b27..1ce4559 100644 --- a/src/future/try_future/try_flatten.rs +++ b/src/future/try_future/try_flatten.rs @@ -2,9 +2,9 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::ready; use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; pin_project! { @@ -24,8 +24,9 @@ impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> { } impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok> - where Fut: TryFuture, - Fut::Ok: TryFuture<Error=Fut::Error>, +where + Fut: TryFuture, + Fut::Ok: TryFuture<Error = Fut::Error>, { fn is_terminated(&self) -> bool { match self { @@ -36,28 +37,27 @@ impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok> } impl<Fut> Future for TryFlatten<Fut, Fut::Ok> - where Fut: TryFuture, - Fut::Ok: TryFuture<Error=Fut::Error>, +where + Fut: TryFuture, + Fut::Ok: TryFuture<Error = Fut::Error>, { type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First { f } => { - match ready!(f.try_poll(cx)) { - Ok(f) => self.set(Self::Second { f }), - Err(e) => { - self.set(Self::Empty); - break Err(e); - } + TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + Ok(f) => self.set(Self::Second { f }), + Err(e) => { + self.set(Self::Empty); + break Err(e); } }, TryFlattenProj::Second { f } => { let output = ready!(f.try_poll(cx)); self.set(Self::Empty); break output; - }, + } TryFlattenProj::Empty => panic!("TryFlatten polled after completion"), } }) @@ -65,8 +65,9 @@ impl<Fut> Future for TryFlatten<Fut, Fut::Ok> } impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok> - where Fut: TryFuture, - Fut::Ok: TryStream<Error=Fut::Error>, +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error>, { fn is_terminated(&self) -> bool { match self { @@ -77,21 +78,20 @@ impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok> } impl<Fut> Stream for TryFlatten<Fut, Fut::Ok> - where Fut: TryFuture, - Fut::Ok: TryStream<Error=Fut::Error>, +where + Fut: TryFuture, + Fut::Ok: TryStream<Error = Fut::Error>, { type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First { f } => { - match ready!(f.try_poll(cx)) { - Ok(f) => self.set(Self::Second { f }), - Err(e) => { - self.set(Self::Empty); - break Some(Err(e)); - } + TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + Ok(f) => self.set(Self::Second { f }), + Err(e) => { + self.set(Self::Empty); + break Some(Err(e)); } }, TryFlattenProj::Second { f } => { @@ -100,40 +100,34 @@ impl<Fut> Stream for TryFlatten<Fut, Fut::Ok> self.set(Self::Empty); } break output; - }, + } TryFlattenProj::Empty => break None, } }) } } - #[cfg(feature = "sink")] impl<Fut, Item> Sink<Item> for TryFlatten<Fut, Fut::Ok> where Fut: TryFuture, - Fut::Ok: Sink<Item, Error=Fut::Error>, + Fut::Ok: Sink<Item, Error = Fut::Error>, { type Error = Fut::Error; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First { f } => { - match ready!(f.try_poll(cx)) { - Ok(f) => self.set(Self::Second { f }), - Err(e) => { - self.set(Self::Empty); - break Err(e); - } + TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { + Ok(f) => self.set(Self::Second { f }), + Err(e) => { + self.set(Self::Empty); + break Err(e); } }, TryFlattenProj::Second { f } => { break ready!(f.poll_ready(cx)); - }, + } TryFlattenProj::Empty => panic!("poll_ready called after eof"), } }) @@ -155,10 +149,7 @@ where } } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { let res = match self.as_mut().project() { TryFlattenProj::Second { f } => f.poll_close(cx), _ => Poll::Ready(Ok(())), diff --git a/src/future/try_future/try_flatten_err.rs b/src/future/try_future/try_flatten_err.rs index 2e67f11..39b7d9f 100644 --- a/src/future/try_future/try_flatten_err.rs +++ b/src/future/try_future/try_flatten_err.rs @@ -21,8 +21,9 @@ impl<Fut1, Fut2> TryFlattenErr<Fut1, Fut2> { } impl<Fut> FusedFuture for TryFlattenErr<Fut, Fut::Error> - where Fut: TryFuture, - Fut::Error: TryFuture<Ok=Fut::Ok>, +where + Fut: TryFuture, + Fut::Error: TryFuture<Ok = Fut::Ok>, { fn is_terminated(&self) -> bool { match self { @@ -33,28 +34,27 @@ impl<Fut> FusedFuture for TryFlattenErr<Fut, Fut::Error> } impl<Fut> Future for TryFlattenErr<Fut, Fut::Error> - where Fut: TryFuture, - Fut::Error: TryFuture<Ok=Fut::Ok>, +where + Fut: TryFuture, + Fut::Error: TryFuture<Ok = Fut::Ok>, { type Output = Result<Fut::Ok, <Fut::Error as TryFuture>::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenErrProj::First { f } => { - match ready!(f.try_poll(cx)) { - Err(f) => self.set(Self::Second { f }), - Ok(e) => { - self.set(Self::Empty); - break Ok(e); - } + TryFlattenErrProj::First { f } => match ready!(f.try_poll(cx)) { + Err(f) => self.set(Self::Second { f }), + Ok(e) => { + self.set(Self::Empty); + break Ok(e); } }, TryFlattenErrProj::Second { f } => { let output = ready!(f.try_poll(cx)); self.set(Self::Empty); break output; - }, + } TryFlattenErrProj::Empty => panic!("TryFlattenErr polled after completion"), } }) diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs index 371f753..29244af 100644 --- a/src/future/try_join_all.rs +++ b/src/future/try_join_all.rs @@ -1,14 +1,14 @@ //! Definition of the `TryJoinAll` combinator, waiting for all of a list of //! futures to finish with either success or error. +use alloc::boxed::Box; +use alloc::vec::Vec; use core::fmt; use core::future::Future; use core::iter::FromIterator; use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use alloc::boxed::Box; -use alloc::vec::Vec; use super::{assert_future, TryFuture, TryMaybeDone}; @@ -16,15 +16,13 @@ fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. - unsafe { slice.get_unchecked_mut() } - .iter_mut() - .map(|t| unsafe { Pin::new_unchecked(t) }) + unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) } enum FinalState<E = ()> { Pending, AllDone, - Error(E) + Error(E), } /// Future for the [`try_join_all`] function. @@ -43,9 +41,7 @@ where F::Error: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryJoinAll") - .field("elems", &self.elems) - .finish() + f.debug_struct("TryJoinAll").field("elems", &self.elems).finish() } } @@ -93,9 +89,9 @@ where I::Item: TryFuture, { let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect(); - assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(TryJoinAll { - elems: elems.into(), - }) + assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( + TryJoinAll { elems: elems.into() }, + ) } impl<F> Future for TryJoinAll<F> @@ -110,7 +106,7 @@ where for elem in iter_pin_mut(self.elems.as_mut()) { match elem.try_poll(cx) { Poll::Pending => state = FinalState::Pending, - Poll::Ready(Ok(())) => {}, + Poll::Ready(Ok(())) => {} Poll::Ready(Err(e)) => { state = FinalState::Error(e); break; @@ -122,15 +118,14 @@ where FinalState::Pending => Poll::Pending, FinalState::AllDone => { let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let results = iter_pin_mut(elems.as_mut()) - .map(|e| e.take_output().unwrap()) - .collect(); + let results = + iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); Poll::Ready(Ok(results)) - }, + } FinalState::Error(e) => { let _ = mem::replace(&mut self.elems, Box::pin([])); Poll::Ready(Err(e)) - }, + } } } } diff --git a/src/future/try_maybe_done.rs b/src/future/try_maybe_done.rs index dfd2900..24044d2 100644 --- a/src/future/try_maybe_done.rs +++ b/src/future/try_maybe_done.rs @@ -49,13 +49,13 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> { #[inline] pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Ok> { match &*self { - Self::Done(_) => {}, + Self::Done(_) => {} Self::Future(_) | Self::Gone => return None, } unsafe { match mem::replace(self.get_unchecked_mut(), Self::Gone) { TryMaybeDone::Done(output) => Some(output), - _ => unreachable!() + _ => unreachable!(), } } } @@ -76,16 +76,14 @@ impl<Fut: TryFuture> Future for TryMaybeDone<Fut> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { unsafe { match self.as_mut().get_unchecked_mut() { - TryMaybeDone::Future(f) => { - match ready!(Pin::new_unchecked(f).try_poll(cx)) { - Ok(res) => self.set(Self::Done(res)), - Err(e) => { - self.set(Self::Gone); - return Poll::Ready(Err(e)); - } + TryMaybeDone::Future(f) => match ready!(Pin::new_unchecked(f).try_poll(cx)) { + Ok(res) => self.set(Self::Done(res)), + Err(e) => { + self.set(Self::Gone); + return Poll::Ready(Err(e)); } }, - TryMaybeDone::Done(_) => {}, + TryMaybeDone::Done(_) => {} TryMaybeDone::Gone => panic!("TryMaybeDone polled after value taken"), } } diff --git a/src/future/try_select.rs b/src/future/try_select.rs index b26eed3..4d0b7ff 100644 --- a/src/future/try_select.rs +++ b/src/future/try_select.rs @@ -1,7 +1,7 @@ +use crate::future::{Either, TryFutureExt}; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; -use crate::future::{Either, TryFutureExt}; /// Future for the [`try_select()`] function. #[must_use = "futures do nothing unless you `.await` or poll them"] @@ -48,22 +48,23 @@ impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {} /// } /// ``` pub fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B> - where A: TryFuture + Unpin, B: TryFuture + Unpin +where + A: TryFuture + Unpin, + B: TryFuture + Unpin, { - super::assert_future::<Result< - Either<(A::Ok, B), (B::Ok, A)>, - Either<(A::Error, B), (B::Error, A)>, - >, _>(TrySelect { inner: Some((future1, future2)) }) + super::assert_future::< + Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>, + _, + >(TrySelect { inner: Some((future1, future2)) }) } impl<A: Unpin, B: Unpin> Future for TrySelect<A, B> - where A: TryFuture, B: TryFuture +where + A: TryFuture, + B: TryFuture, { #[allow(clippy::type_complexity)] - type Output = Result< - Either<(A::Ok, B), (B::Ok, A)>, - Either<(A::Error, B), (B::Error, A)>, - >; + type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); @@ -77,7 +78,7 @@ impl<A: Unpin, B: Unpin> Future for TrySelect<A, B> self.inner = Some((a, b)); Poll::Pending } - } + }, } } } diff --git a/src/io/allow_std.rs b/src/io/allow_std.rs index 9aa8eb4..1d13e0c 100644 --- a/src/io/allow_std.rs +++ b/src/io/allow_std.rs @@ -1,9 +1,9 @@ use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] use futures_io::Initializer; -use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, IoSlice, IoSliceMut, SeekFrom}; -use std::{fmt, io}; +use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom}; use std::pin::Pin; +use std::{fmt, io}; /// A simple wrapper type which allows types which implement only /// implement `std::io::Read` or `std::io::Write` @@ -35,7 +35,7 @@ macro_rules! try_with_interrupt { } } } - } + }; } impl<T> AllowStdIo<T> { @@ -60,7 +60,10 @@ impl<T> AllowStdIo<T> { } } -impl<T> io::Write for AllowStdIo<T> where T: io::Write { +impl<T> io::Write for AllowStdIo<T> +where + T: io::Write, +{ fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) } @@ -78,16 +81,23 @@ impl<T> io::Write for AllowStdIo<T> where T: io::Write { } } -impl<T> AsyncWrite for AllowStdIo<T> where T: io::Write { - fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) - -> Poll<io::Result<usize>> - { +impl<T> AsyncWrite for AllowStdIo<T> +where + T: io::Write, +{ + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { Poll::Ready(Ok(try_with_interrupt!(self.0.write(buf)))) } - fn poll_write_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>]) - -> Poll<io::Result<usize>> - { + fn poll_write_vectored( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { Poll::Ready(Ok(try_with_interrupt!(self.0.write_vectored(bufs)))) } @@ -101,7 +111,10 @@ impl<T> AsyncWrite for AllowStdIo<T> where T: io::Write { } } -impl<T> io::Read for AllowStdIo<T> where T: io::Read { +impl<T> io::Read for AllowStdIo<T> +where + T: io::Read, +{ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) } @@ -123,16 +136,23 @@ impl<T> io::Read for AllowStdIo<T> where T: io::Read { } } -impl<T> AsyncRead for AllowStdIo<T> where T: io::Read { - fn poll_read(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) - -> Poll<io::Result<usize>> - { +impl<T> AsyncRead for AllowStdIo<T> +where + T: io::Read, +{ + fn poll_read( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { Poll::Ready(Ok(try_with_interrupt!(self.0.read(buf)))) } - fn poll_read_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) - -> Poll<io::Result<usize>> - { + fn poll_read_vectored( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll<io::Result<usize>> { Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs)))) } @@ -142,21 +162,32 @@ impl<T> AsyncRead for AllowStdIo<T> where T: io::Read { } } -impl<T> io::Seek for AllowStdIo<T> where T: io::Seek { +impl<T> io::Seek for AllowStdIo<T> +where + T: io::Seek, +{ fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { self.0.seek(pos) } } -impl<T> AsyncSeek for AllowStdIo<T> where T: io::Seek { - fn poll_seek(mut self: Pin<&mut Self>, _: &mut Context<'_>, pos: SeekFrom) - -> Poll<io::Result<u64>> - { +impl<T> AsyncSeek for AllowStdIo<T> +where + T: io::Seek, +{ + fn poll_seek( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { Poll::Ready(Ok(try_with_interrupt!(self.0.seek(pos)))) } } -impl<T> io::BufRead for AllowStdIo<T> where T: io::BufRead { +impl<T> io::BufRead for AllowStdIo<T> +where + T: io::BufRead, +{ fn fill_buf(&mut self) -> io::Result<&[u8]> { self.0.fill_buf() } @@ -165,10 +196,11 @@ impl<T> io::BufRead for AllowStdIo<T> where T: io::BufRead { } } -impl<T> AsyncBufRead for AllowStdIo<T> where T: io::BufRead { - fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) - -> Poll<io::Result<&[u8]>> - { +impl<T> AsyncBufRead for AllowStdIo<T> +where + T: io::BufRead, +{ + fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { let this: *mut Self = &mut *self as *mut _; Poll::Ready(Ok(try_with_interrupt!(unsafe { &mut *this }.0.fill_buf()))) } diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs index 270a086..5931edc 100644 --- a/src/io/buf_reader.rs +++ b/src/io/buf_reader.rs @@ -1,3 +1,4 @@ +use super::DEFAULT_BUF_SIZE; use futures_core::ready; use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] @@ -7,7 +8,6 @@ use pin_project_lite::pin_project; use std::io::{self, Read}; use std::pin::Pin; use std::{cmp, fmt}; -use super::DEFAULT_BUF_SIZE; pin_project! { /// The `BufReader` struct adds buffering to any reader. @@ -51,12 +51,7 @@ impl<R: AsyncRead> BufReader<R> { let mut buffer = Vec::with_capacity(capacity); buffer.set_len(capacity); super::initialize(&inner, &mut buffer); - Self { - inner, - buffer: buffer.into_boxed_slice(), - pos: 0, - cap: 0, - } + Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 } } } @@ -123,10 +118,7 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> { } impl<R: AsyncRead> AsyncBufRead for BufReader<R> { - fn poll_fill_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<&[u8]>> { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { let this = self.project(); // If we've reached the end of our internal buffer then we need to fetch @@ -192,7 +184,8 @@ impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> { // support seeking by i64::min_value() so we need to handle underflow when subtracting // remainder. if let Some(offset) = n.checked_sub(remainder) { - result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?; + result = + ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?; } else { // seek backwards by our remainder, and then by the offset ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?; diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index 991a365..f292b87 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -1,3 +1,4 @@ +use super::DEFAULT_BUF_SIZE; use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, SeekFrom}; @@ -5,7 +6,6 @@ use pin_project_lite::pin_project; use std::fmt; use std::io::{self, Write}; use std::pin::Pin; -use super::DEFAULT_BUF_SIZE; pin_project! { /// Wraps a writer and buffers its output. @@ -46,11 +46,7 @@ impl<W: AsyncWrite> BufWriter<W> { /// Creates a new `BufWriter` with the specified buffer capacity. pub fn with_capacity(cap: usize, inner: W) -> Self { - Self { - inner, - buf: Vec::with_capacity(cap), - written: 0, - } + Self { inner, buf: Vec::with_capacity(cap), written: 0 } } fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { diff --git a/src/io/chain.rs b/src/io/chain.rs index 1b6a335..a35c50d 100644 --- a/src/io/chain.rs +++ b/src/io/chain.rs @@ -26,11 +26,7 @@ where U: AsyncRead, { pub(super) fn new(first: T, second: U) -> Self { - Self { - first, - second, - done_first: false, - } + Self { first, second, done_first: false } } /// Gets references to the underlying readers in this `Chain`. diff --git a/src/io/copy.rs b/src/io/copy.rs index bc59255..c80add2 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -1,10 +1,10 @@ +use super::{copy_buf, BufReader, CopyBuf}; use futures_core::future::Future; use futures_core::task::{Context, Poll}; use futures_io::{AsyncRead, AsyncWrite}; +use pin_project_lite::pin_project; use std::io; use std::pin::Pin; -use super::{BufReader, copy_buf, CopyBuf}; -use pin_project_lite::pin_project; /// Creates a future which copies all the bytes from one object to another. /// @@ -36,9 +36,7 @@ where R: AsyncRead, W: AsyncWrite + Unpin + ?Sized, { - Copy { - inner: copy_buf(BufReader::new(reader), writer), - } + Copy { inner: copy_buf(BufReader::new(reader), writer) } } pin_project! { diff --git a/src/io/copy_buf.rs b/src/io/copy_buf.rs index 6adf594..50f7abd 100644 --- a/src/io/copy_buf.rs +++ b/src/io/copy_buf.rs @@ -2,9 +2,9 @@ use futures_core::future::Future; use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncWrite}; +use pin_project_lite::pin_project; use std::io; use std::pin::Pin; -use pin_project_lite::pin_project; /// Creates a future which copies all the bytes from one object to another. /// @@ -36,11 +36,7 @@ where R: AsyncBufRead, W: AsyncWrite + Unpin + ?Sized, { - CopyBuf { - reader, - writer, - amt: 0, - } + CopyBuf { reader, writer, amt: 0 } } pin_project! { @@ -56,8 +52,9 @@ pin_project! { } impl<R, W> Future for CopyBuf<'_, R, W> - where R: AsyncBufRead, - W: AsyncWrite + Unpin + ?Sized, +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + ?Sized, { type Output = io::Result<u64>; @@ -72,7 +69,7 @@ impl<R, W> Future for CopyBuf<'_, R, W> let i = ready!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; if i == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); } *this.amt += i as u64; this.reader.as_mut().consume(i); diff --git a/src/io/cursor.rs b/src/io/cursor.rs index 084fb08..b6fb372 100644 --- a/src/io/cursor.rs +++ b/src/io/cursor.rs @@ -43,9 +43,7 @@ impl<T> Cursor<T> { /// # force_inference(&buff); /// ``` pub fn new(inner: T) -> Self { - Self { - inner: io::Cursor::new(inner), - } + Self { inner: io::Cursor::new(inner) } } /// Consumes this cursor, returning the underlying value. @@ -199,15 +197,19 @@ where macro_rules! delegate_async_write_to_stdio { () => { - fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) - -> Poll<io::Result<usize>> - { + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { Poll::Ready(io::Write::write(&mut self.inner, buf)) } - fn poll_write_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>]) - -> Poll<io::Result<usize>> - { + fn poll_write_vectored( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { Poll::Ready(io::Write::write_vectored(&mut self.inner, bufs)) } @@ -218,7 +220,7 @@ macro_rules! delegate_async_write_to_stdio { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { self.poll_flush(cx) } - } + }; } impl AsyncWrite for Cursor<&mut [u8]> { diff --git a/src/io/fill_buf.rs b/src/io/fill_buf.rs index 6fb3ec7..19b0d20 100644 --- a/src/io/fill_buf.rs +++ b/src/io/fill_buf.rs @@ -20,7 +20,8 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> { } impl<'a, R> Future for FillBuf<'a, R> - where R: AsyncBufRead + ?Sized + Unpin, +where + R: AsyncBufRead + ?Sized + Unpin, { type Output = io::Result<&'a [u8]>; diff --git a/src/io/flush.rs b/src/io/flush.rs index ece0a7c..b75d14c 100644 --- a/src/io/flush.rs +++ b/src/io/flush.rs @@ -20,7 +20,8 @@ impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> { } impl<W> Future for Flush<'_, W> - where W: AsyncWrite + ?Sized + Unpin, +where + W: AsyncWrite + ?Sized + Unpin, { type Output = io::Result<()>; diff --git a/src/io/into_sink.rs b/src/io/into_sink.rs index 885ba2f..384b8e3 100644 --- a/src/io/into_sink.rs +++ b/src/io/into_sink.rs @@ -2,9 +2,9 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::AsyncWrite; use futures_sink::Sink; +use pin_project_lite::pin_project; use std::io; use std::pin::Pin; -use pin_project_lite::pin_project; #[derive(Debug)] struct Block<Item> { @@ -36,8 +36,7 @@ impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> { fn poll_flush_buffer( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll<Result<(), io::Error>> - { + ) -> Poll<Result<(), io::Error>> { let mut this = self.project(); if let Some(buffer) = this.buffer { @@ -53,47 +52,30 @@ impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> { *this.buffer = None; Poll::Ready(Ok(())) } - } impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> { type Error = io::Error; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> - { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.poll_flush_buffer(cx))?; Poll::Ready(Ok(())) } #[allow(clippy::debug_assert_with_mut_call)] - fn start_send( - self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> - { + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { debug_assert!(self.buffer.is_none()); *self.project().buffer = Some(Block { offset: 0, bytes: item }); Ok(()) } - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> - { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.as_mut().poll_flush_buffer(cx))?; ready!(self.project().writer.poll_flush(cx))?; Poll::Ready(Ok(())) } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> - { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.as_mut().poll_flush_buffer(cx))?; ready!(self.project().writer.poll_close(cx))?; Poll::Ready(Ok(())) diff --git a/src/io/lines.rs b/src/io/lines.rs index 6ae7392..13e70df 100644 --- a/src/io/lines.rs +++ b/src/io/lines.rs @@ -1,12 +1,12 @@ +use super::read_line::read_line_internal; use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_io::AsyncBufRead; +use pin_project_lite::pin_project; use std::io; use std::mem; use std::pin::Pin; -use super::read_line::read_line_internal; -use pin_project_lite::pin_project; pin_project! { /// Stream for the [`lines`](super::AsyncBufReadExt::lines) method. @@ -23,12 +23,7 @@ pin_project! { impl<R: AsyncBufRead> Lines<R> { pub(super) fn new(reader: R) -> Self { - Self { - reader, - buf: String::new(), - bytes: Vec::new(), - read: 0, - } + Self { reader, buf: String::new(), bytes: Vec::new(), read: 0 } } } @@ -39,7 +34,7 @@ impl<R: AsyncBufRead> Stream for Lines<R> { let this = self.project(); let n = ready!(read_line_internal(this.reader, cx, this.buf, this.bytes, this.read))?; if n == 0 && this.buf.is_empty() { - return Poll::Ready(None) + return Poll::Ready(None); } if this.buf.ends_with('\n') { this.buf.pop(); diff --git a/src/io/mod.rs b/src/io/mod.rs index 1437930..b96223d 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -21,18 +21,18 @@ use crate::compat::Compat; use crate::future::assert_future; use crate::stream::assert_stream; -use std::{ptr, pin::Pin}; +use std::{pin::Pin, ptr}; // Re-export some types from `std::io` so that users don't have to deal // with conflicts when `use`ing `futures::io` and `std::io`. #[doc(no_inline)] -pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; -#[doc(no_inline)] #[cfg(feature = "read-initializer")] #[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))] pub use std::io::Initializer; +#[doc(no_inline)] +pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; -pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead}; +pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; // used by `BufReader` and `BufWriter` // https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1 @@ -126,7 +126,7 @@ mod sink; pub use self::sink::{sink, Sink}; mod split; -pub use self::split::{ReadHalf, WriteHalf, ReuniteError}; +pub use self::split::{ReadHalf, ReuniteError, WriteHalf}; mod take; pub use self::take::Take; @@ -206,7 +206,8 @@ pub trait AsyncReadExt: AsyncRead { /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> - where Self: Unpin, + where + Self: Unpin, { assert_future::<Result<usize>, _>(Read::new(self, buf)) } @@ -217,7 +218,8 @@ pub trait AsyncReadExt: AsyncRead { /// The returned future will resolve to the number of bytes read once the read /// operation is completed. fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self> - where Self: Unpin, + where + Self: Unpin, { assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs)) } @@ -259,11 +261,9 @@ pub trait AsyncReadExt: AsyncRead { /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof); /// # }); /// ``` - fn read_exact<'a>( - &'a mut self, - buf: &'a mut [u8], - ) -> ReadExact<'a, Self> - where Self: Unpin, + fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self> + where + Self: Unpin, { assert_future::<Result<()>, _>(ReadExact::new(self, buf)) } @@ -287,11 +287,9 @@ pub trait AsyncReadExt: AsyncRead { /// assert_eq!(output, vec![1, 2, 3, 4]); /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` - fn read_to_end<'a>( - &'a mut self, - buf: &'a mut Vec<u8>, - ) -> ReadToEnd<'a, Self> - where Self: Unpin, + fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self> + where + Self: Unpin, { assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf)) } @@ -315,11 +313,9 @@ pub trait AsyncReadExt: AsyncRead { /// assert_eq!(buffer, String::from("1234")); /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` - fn read_to_string<'a>( - &'a mut self, - buf: &'a mut String, - ) -> ReadToString<'a, Self> - where Self: Unpin, + fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self> + where + Self: Unpin, { assert_future::<Result<usize>, _>(ReadToString::new(self, buf)) } @@ -354,7 +350,8 @@ pub trait AsyncReadExt: AsyncRead { /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>) - where Self: AsyncWrite + Sized, + where + Self: AsyncWrite + Sized, { let (r, w) = split::split(self); (assert_read(r), assert_write(w)) @@ -380,7 +377,8 @@ pub trait AsyncReadExt: AsyncRead { /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` fn take(self, limit: u64) -> Take<Self> - where Self: Sized + where + Self: Sized, { assert_read(Take::new(self, limit)) } @@ -394,7 +392,8 @@ pub trait AsyncReadExt: AsyncRead { #[cfg(feature = "io-compat")] #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] fn compat(self) -> Compat<Self> - where Self: Sized + Unpin, + where + Self: Sized + Unpin, { Compat::new(self) } @@ -427,14 +426,16 @@ pub trait AsyncWriteExt: AsyncWrite { /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` fn flush(&mut self) -> Flush<'_, Self> - where Self: Unpin, + where + Self: Unpin, { assert_future::<Result<()>, _>(Flush::new(self)) } /// Creates a future which will entirely close this `AsyncWrite`. fn close(&mut self) -> Close<'_, Self> - where Self: Unpin, + where + Self: Unpin, { assert_future::<Result<()>, _>(Close::new(self)) } @@ -444,7 +445,8 @@ pub trait AsyncWriteExt: AsyncWrite { /// The returned future will resolve to the number of bytes written once the write /// operation is completed. fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self> - where Self: Unpin, + where + Self: Unpin, { assert_future::<Result<usize>, _>(Write::new(self, buf)) } @@ -455,7 +457,8 @@ pub trait AsyncWriteExt: AsyncWrite { /// The returned future will resolve to the number of bytes written once the write /// operation is completed. fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self> - where Self: Unpin, + where + Self: Unpin, { assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs)) } @@ -481,7 +484,8 @@ pub trait AsyncWriteExt: AsyncWrite { /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self> - where Self: Unpin, + where + Self: Unpin, { assert_future::<Result<()>, _>(WriteAll::new(self, buf)) } @@ -547,7 +551,8 @@ pub trait AsyncWriteExt: AsyncWrite { #[cfg(feature = "io-compat")] #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] fn compat_write(self) -> Compat<Self> - where Self: Sized + Unpin, + where + Self: Sized + Unpin, { Compat::new(self) } @@ -581,7 +586,8 @@ pub trait AsyncWriteExt: AsyncWrite { #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item> - where Self: Sized, + where + Self: Sized, { crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self)) } @@ -597,10 +603,22 @@ pub trait AsyncSeekExt: AsyncSeek { /// In the case of an error the buffer and the object will be discarded, with /// the error yielded. fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> - where Self: Unpin, + where + Self: Unpin, { assert_future::<Result<u64>, _>(Seek::new(self, pos)) } + + /// Creates a future which will return the current seek position from the + /// start of the stream. + /// + /// This is equivalent to `self.seek(SeekFrom::Current(0))`. + fn stream_position(&mut self) -> Seek<'_, Self> + where + Self: Unpin, + { + self.seek(SeekFrom::Current(0)) + } } impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {} @@ -631,7 +649,8 @@ pub trait AsyncBufReadExt: AsyncBufRead { /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` fn fill_buf(&mut self) -> FillBuf<'_, Self> - where Self: Unpin, + where + Self: Unpin, { assert_future::<Result<&[u8]>, _>(FillBuf::new(self)) } @@ -654,7 +673,8 @@ pub trait AsyncBufReadExt: AsyncBufRead { /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` fn consume_unpin(&mut self, amt: usize) - where Self: Unpin, + where + Self: Unpin, { Pin::new(self).consume(amt) } @@ -700,12 +720,9 @@ pub trait AsyncBufReadExt: AsyncBufRead { /// assert_eq!(buf, b""); /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` - fn read_until<'a>( - &'a mut self, - byte: u8, - buf: &'a mut Vec<u8>, - ) -> ReadUntil<'a, Self> - where Self: Unpin, + fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self> + where + Self: Unpin, { assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf)) } @@ -762,7 +779,8 @@ pub trait AsyncBufReadExt: AsyncBufRead { /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> - where Self: Unpin, + where + Self: Unpin, { assert_future::<Result<usize>, _>(ReadLine::new(self, buf)) } @@ -800,7 +818,8 @@ pub trait AsyncBufReadExt: AsyncBufRead { /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` fn lines(self) -> Lines<Self> - where Self: Sized, + where + Self: Sized, { assert_stream::<Result<String>, _>(Lines::new(self)) } diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs index f2e0440..02e38c3 100644 --- a/src/io/read_exact.rs +++ b/src/io/read_exact.rs @@ -1,6 +1,6 @@ use crate::io::AsyncRead; -use futures_core::ready; use futures_core::future::Future; +use futures_core::ready; use futures_core::task::{Context, Poll}; use std::io; use std::mem; @@ -34,7 +34,7 @@ impl<R: AsyncRead + ?Sized + Unpin> Future for ReadExact<'_, R> { this.buf = rest; } if n == 0 { - return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())) + return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())); } } Poll::Ready(Ok(())) diff --git a/src/io/read_line.rs b/src/io/read_line.rs index d402c96..c75af94 100644 --- a/src/io/read_line.rs +++ b/src/io/read_line.rs @@ -1,12 +1,12 @@ -use futures_core::ready; +use super::read_until::read_until_internal; use futures_core::future::Future; +use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::AsyncBufRead; use std::io; use std::mem; use std::pin::Pin; use std::str; -use super::read_until::read_until_internal; /// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method. #[derive(Debug)] @@ -22,12 +22,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {} impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { - Self { - reader, - bytes: mem::replace(buf, String::new()).into_bytes(), - buf, - read: 0, - } + Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, read: 0 } } } diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs index 7bd2c89..919d7d1 100644 --- a/src/io/read_to_end.rs +++ b/src/io/read_to_end.rs @@ -20,11 +20,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadToEnd<'_, R> {} impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToEnd<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> Self { let start_len = buf.len(); - Self { - reader, - buf, - start_len, - } + Self { reader, buf, start_len } } } @@ -56,10 +52,7 @@ pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>( buf: &mut Vec<u8>, start_len: usize, ) -> Poll<io::Result<usize>> { - let mut g = Guard { - len: buf.len(), - buf, - }; + let mut g = Guard { len: buf.len(), buf }; loop { if g.len == g.buf.len() { unsafe { diff --git a/src/io/read_to_string.rs b/src/io/read_to_string.rs index 9242654..457af59 100644 --- a/src/io/read_to_string.rs +++ b/src/io/read_to_string.rs @@ -1,6 +1,6 @@ use super::read_to_end::read_to_end_internal; -use futures_core::ready; use futures_core::future::Future; +use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::AsyncRead; use std::pin::Pin; @@ -22,12 +22,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadToString<'_, R> {} impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { let start_len = buf.len(); - Self { - reader, - bytes: mem::replace(buf, String::new()).into_bytes(), - buf, - start_len, - } + Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, start_len } } } @@ -41,10 +36,7 @@ fn read_to_string_internal<R: AsyncRead + ?Sized>( let ret = ready!(read_to_end_internal(reader, cx, bytes, start_len)); if str::from_utf8(bytes).is_err() { Poll::Ready(ret.and_then(|_| { - Err(io::Error::new( - io::ErrorKind::InvalidData, - "stream did not contain valid UTF-8", - )) + Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8")) })) } else { debug_assert!(buf.is_empty()); diff --git a/src/io/split.rs b/src/io/split.rs index 185c21c..3f1b9af 100644 --- a/src/io/split.rs +++ b/src/io/split.rs @@ -1,8 +1,8 @@ use crate::lock::BiLock; +use core::fmt; use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut}; -use core::fmt; use std::io; use std::pin::Pin; @@ -18,12 +18,9 @@ pub struct WriteHalf<T> { handle: BiLock<T>, } -fn lock_and_then<T, U, E, F>( - lock: &BiLock<T>, - cx: &mut Context<'_>, - f: F -) -> Poll<Result<U, E>> - where F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<U, E>> +fn lock_and_then<T, U, E, F>(lock: &BiLock<T>, cx: &mut Context<'_>, f: F) -> Poll<Result<U, E>> +where + F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<U, E>>, { let mut l = ready!(lock.poll_lock(cx)); f(l.as_pin_mut(), cx) @@ -39,9 +36,9 @@ impl<T: Unpin> ReadHalf<T> { /// together. Succeeds only if the `ReadHalf<T>` and `WriteHalf<T>` are /// a matching pair originating from the same call to `AsyncReadExt::split`. pub fn reunite(self, other: WriteHalf<T>) -> Result<T, ReuniteError<T>> { - self.handle.reunite(other.handle).map_err(|err| { - ReuniteError(ReadHalf { handle: err.0 }, WriteHalf { handle: err.1 }) - }) + self.handle + .reunite(other.handle) + .map_err(|err| ReuniteError(ReadHalf { handle: err.0 }, WriteHalf { handle: err.1 })) } } @@ -55,29 +52,37 @@ impl<T: Unpin> WriteHalf<T> { } impl<R: AsyncRead> AsyncRead for ReadHalf<R> { - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll<io::Result<usize>> - { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { lock_and_then(&self.handle, cx, |l, cx| l.poll_read(cx, buf)) } - fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) - -> Poll<io::Result<usize>> - { + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll<io::Result<usize>> { lock_and_then(&self.handle, cx, |l, cx| l.poll_read_vectored(cx, bufs)) } } impl<W: AsyncWrite> AsyncWrite for WriteHalf<W> { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) - -> Poll<io::Result<usize>> - { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { lock_and_then(&self.handle, cx, |l, cx| l.poll_write(cx, buf)) } - fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) - -> Poll<io::Result<usize>> - { + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { lock_and_then(&self.handle, cx, |l, cx| l.poll_write_vectored(cx, bufs)) } @@ -96,9 +101,7 @@ pub struct ReuniteError<T>(pub ReadHalf<T>, pub WriteHalf<T>); impl<T> fmt::Debug for ReuniteError<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ReuniteError") - .field(&"...") - .finish() + f.debug_tuple("ReuniteError").field(&"...").finish() } } diff --git a/src/io/take.rs b/src/io/take.rs index 687a697..0583020 100644 --- a/src/io/take.rs +++ b/src/io/take.rs @@ -2,10 +2,10 @@ use futures_core::ready; use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] use futures_io::Initializer; -use futures_io::{AsyncRead, AsyncBufRead}; +use futures_io::{AsyncBufRead, AsyncRead}; use pin_project_lite::pin_project; -use std::{cmp, io}; use std::pin::Pin; +use std::{cmp, io}; pin_project! { /// Reader for the [`take`](super::AsyncReadExt::take) method. @@ -14,14 +14,13 @@ pin_project! { pub struct Take<R> { #[pin] inner: R, - // Add '_' to avoid conflicts with `limit` method. - limit_: u64, + limit: u64, } } impl<R: AsyncRead> Take<R> { pub(super) fn new(inner: R, limit: u64) -> Self { - Self { inner, limit_: limit } + Self { inner, limit } } /// Returns the remaining number of bytes that can be @@ -48,7 +47,7 @@ impl<R: AsyncRead> Take<R> { /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` pub fn limit(&self) -> u64 { - self.limit_ + self.limit } /// Sets the number of bytes that can be read before this instance will @@ -78,7 +77,7 @@ impl<R: AsyncRead> Take<R> { /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` pub fn set_limit(&mut self, limit: u64) { - self.limit_ = limit + self.limit = limit } delegate_access_inner!(inner, R, ()); @@ -92,13 +91,13 @@ impl<R: AsyncRead> AsyncRead for Take<R> { ) -> Poll<Result<usize, io::Error>> { let this = self.project(); - if *this.limit_ == 0 { + if *this.limit == 0 { return Poll::Ready(Ok(0)); } - let max = cmp::min(buf.len() as u64, *this.limit_) as usize; + let max = cmp::min(buf.len() as u64, *this.limit) as usize; let n = ready!(this.inner.poll_read(cx, &mut buf[..max]))?; - *this.limit_ -= n as u64; + *this.limit -= n as u64; Poll::Ready(Ok(n)) } @@ -113,12 +112,12 @@ impl<R: AsyncBufRead> AsyncBufRead for Take<R> { let this = self.project(); // Don't call into inner reader at all at EOF because it may still block - if *this.limit_ == 0 { + if *this.limit == 0 { return Poll::Ready(Ok(&[])); } let buf = ready!(this.inner.poll_fill_buf(cx)?); - let cap = cmp::min(buf.len() as u64, *this.limit_) as usize; + let cap = cmp::min(buf.len() as u64, *this.limit) as usize; Poll::Ready(Ok(&buf[..cap])) } @@ -126,8 +125,8 @@ impl<R: AsyncBufRead> AsyncBufRead for Take<R> { let this = self.project(); // Don't let callers reset the limit by passing an overlarge value - let amt = cmp::min(amt as u64, *this.limit_) as usize; - *this.limit_ -= amt as u64; + let amt = cmp::min(amt as u64, *this.limit) as usize; + *this.limit -= amt as u64; this.inner.consume(amt); } } diff --git a/src/io/window.rs b/src/io/window.rs index 3424197..77b7267 100644 --- a/src/io/window.rs +++ b/src/io/window.rs @@ -30,10 +30,7 @@ impl<T: AsRef<[u8]>> Window<T> { /// Further methods can be called on the returned `Window<T>` to alter the /// window into the data provided. pub fn new(t: T) -> Self { - Self { - range: 0..t.as_ref().len(), - inner: t, - } + Self { range: 0..t.as_ref().len(), inner: t } } /// Gets a shared reference to the underlying buffer inside of this diff --git a/src/io/write_all_vectored.rs b/src/io/write_all_vectored.rs index 380604d..f465209 100644 --- a/src/io/write_all_vectored.rs +++ b/src/io/write_all_vectored.rs @@ -1,5 +1,5 @@ -use futures_core::ready; use futures_core::future::Future; +use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::AsyncWrite; use futures_io::IoSlice; @@ -56,11 +56,7 @@ mod tests { /// Create a new writer that reads from at most `n_bufs` and reads /// `per_call` bytes (in total) per call to write. fn test_writer(n_bufs: usize, per_call: usize) -> TestWriter { - TestWriter { - n_bufs, - per_call, - written: Vec::new(), - } + TestWriter { n_bufs, per_call, written: Vec::new() } } // TODO: maybe move this the future-test crate? @@ -110,10 +106,9 @@ mod tests { let expected = $expected; match $e { Poll::Ready(Ok(ok)) if ok == expected => {} - got => panic!( - "unexpected result, got: {:?}, wanted: Ready(Ok({:?}))", - got, expected - ), + got => { + panic!("unexpected result, got: {:?}, wanted: Ready(Ok({:?}))", got, expected) + } } }; } @@ -154,11 +149,7 @@ mod tests { assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3); // Read at most 3 bytes from three buffers. - let bufs = &[ - IoSlice::new(&[3]), - IoSlice::new(&[4]), - IoSlice::new(&[5, 5]), - ]; + let bufs = &[IoSlice::new(&[3]), IoSlice::new(&[4]), IoSlice::new(&[5, 5])]; assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3); assert_eq!(dst.written, &[1, 2, 2, 3, 4, 5]); @@ -1,25 +1,16 @@ //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s, //! and the `AsyncRead` and `AsyncWrite` traits. -#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] #![cfg_attr(feature = "read-initializer", feature(read_initializer))] #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] #![cfg_attr(not(feature = "std"), no_std)] -#![warn( - missing_docs, - missing_debug_implementations, - rust_2018_idioms, - unreachable_pub -)] +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] // It cannot be included in the published code because this lints have false positives in the minimum required version. #![cfg_attr(test, warn(single_use_lifetimes))] #![warn(clippy::all)] #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] #![cfg_attr(docsrs, feature(doc_cfg))] -#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] -compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); - #[cfg(all(feature = "bilock", not(feature = "unstable")))] compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); @@ -58,7 +49,7 @@ pub mod __private { macro_rules! cfg_target_has_atomic { ($($item:item)*) => {$( - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] $item )*}; } @@ -309,18 +300,18 @@ macro_rules! delegate_all { pub mod future; #[doc(hidden)] -pub use crate::future::{FutureExt, TryFutureExt}; +pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt}; pub mod stream; #[doc(hidden)] -pub use crate::stream::{StreamExt, TryStreamExt}; +pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt}; #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub mod sink; #[cfg(feature = "sink")] #[doc(hidden)] -pub use crate::sink::SinkExt; +pub use crate::sink::{Sink, SinkExt}; pub mod task; @@ -337,10 +328,18 @@ pub mod io; #[cfg(feature = "io")] #[cfg(feature = "std")] #[doc(hidden)] -pub use crate::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +pub use crate::io::{ + AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, + AsyncWriteExt, +}; #[cfg(feature = "alloc")] pub mod lock; +cfg_target_has_atomic! { + #[cfg(feature = "alloc")] + mod abortable; +} + mod fns; mod unfold_state; diff --git a/src/lock/bilock.rs b/src/lock/bilock.rs index 600e16e..2f51ae7 100644 --- a/src/lock/bilock.rs +++ b/src/lock/bilock.rs @@ -1,16 +1,16 @@ //! Futures-powered synchronization primitives. -#[cfg(feature = "bilock")] -use futures_core::future::Future; -use futures_core::task::{Context, Poll, Waker}; +use alloc::boxed::Box; +use alloc::sync::Arc; use core::cell::UnsafeCell; use core::fmt; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::sync::atomic::AtomicUsize; use core::sync::atomic::Ordering::SeqCst; -use alloc::boxed::Box; -use alloc::sync::Arc; +#[cfg(feature = "bilock")] +use futures_core::future::Future; +use futures_core::task::{Context, Poll, Waker}; /// A type of futures-powered synchronization primitive which is a mutex between /// two possible owners. @@ -61,10 +61,7 @@ impl<T> BiLock<T> { /// Similarly, reuniting the lock and extracting the inner value is only /// possible when `T` is `Unpin`. pub fn new(t: T) -> (Self, Self) { - let arc = Arc::new(Inner { - state: AtomicUsize::new(0), - value: Some(UnsafeCell::new(t)), - }); + let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) }); (Self { arc: arc.clone() }, Self { arc }) } @@ -103,11 +100,11 @@ impl<T> BiLock<T> { let mut prev = Box::from_raw(n as *mut Waker); *prev = cx.waker().clone(); waker = Some(prev); - } + }, } // type ascription for safety's sake! - let me: Box<Waker> = waker.take().unwrap_or_else(||Box::new(cx.waker().clone())); + let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); let me = Box::into_raw(me) as usize; match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) { @@ -145,9 +142,7 @@ impl<T> BiLock<T> { #[cfg(feature = "bilock")] #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] pub fn lock(&self) -> BiLockAcquire<'_, T> { - BiLockAcquire { - bilock: self, - } + BiLockAcquire { bilock: self } } /// Attempts to put the two "halves" of a `BiLock<T>` back together and @@ -181,7 +176,7 @@ impl<T> BiLock<T> { // up as its now their turn. n => unsafe { Box::from_raw(n as *mut Waker).wake(); - } + }, } } } @@ -205,9 +200,7 @@ pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>); impl<T> fmt::Debug for ReuniteError<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ReuniteError") - .field(&"...") - .finish() + f.debug_tuple("ReuniteError").field(&"...").finish() } } diff --git a/src/lock/mutex.rs b/src/lock/mutex.rs index a78de62..a849aee 100644 --- a/src/lock/mutex.rs +++ b/src/lock/mutex.rs @@ -1,13 +1,13 @@ use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll, Waker}; use slab::Slab; -use std::{fmt, mem}; use std::cell::UnsafeCell; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::pin::Pin; -use std::sync::Mutex as StdMutex; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Mutex as StdMutex; +use std::{fmt, mem}; /// A futures-aware mutex. /// @@ -53,7 +53,7 @@ enum Waiter { impl Waiter { fn register(&mut self, waker: &Waker) { match self { - Self::Waiting(w) if waker.will_wake(w) => {}, + Self::Waiting(w) if waker.will_wake(w) => {} _ => *self = Self::Waiting(waker.clone()), } } @@ -61,7 +61,7 @@ impl Waiter { fn wake(&mut self) { match mem::replace(self, Self::Woken) { Self::Waiting(waker) => waker.wake(), - Self::Woken => {}, + Self::Woken => {} } } } @@ -113,10 +113,7 @@ impl<T: ?Sized> Mutex<T> { /// This method returns a future that will resolve once the lock has been /// successfully acquired. pub fn lock(&self) -> MutexLockFuture<'_, T> { - MutexLockFuture { - mutex: Some(self), - wait_key: WAIT_KEY_NONE, - } + MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } } /// Returns a mutable reference to the underlying data. @@ -145,7 +142,7 @@ impl<T: ?Sized> Mutex<T> { if wait_key != WAIT_KEY_NONE { let mut waiters = self.waiters.lock().unwrap(); match waiters.remove(wait_key) { - Waiter::Waiting(_) => {}, + Waiter::Waiting(_) => {} Waiter::Woken => { // We were awoken, but then dropped before we could // wake up to acquire the lock. Wake up another @@ -191,13 +188,10 @@ impl<T: ?Sized> fmt::Debug for MutexLockFuture<'_, T> { f.debug_struct("MutexLockFuture") .field("was_acquired", &self.mutex.is_none()) .field("mutex", &self.mutex) - .field("wait_key", &( - if self.wait_key == WAIT_KEY_NONE { - None - } else { - Some(self.wait_key) - } - )) + .field( + "wait_key", + &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }), + ) .finish() } } @@ -295,10 +289,7 @@ impl<'a, T: ?Sized> MutexGuard<'a, T> { impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MutexGuard") - .field("value", &&**self) - .field("mutex", &self.mutex) - .finish() + f.debug_struct("MutexGuard").field("value", &&**self).field("mutex", &self.mutex).finish() } } diff --git a/src/sink/buffer.rs b/src/sink/buffer.rs index 8c58f4f..c6ea548 100644 --- a/src/sink/buffer.rs +++ b/src/sink/buffer.rs @@ -1,10 +1,10 @@ +use alloc::collections::VecDeque; +use core::pin::Pin; use futures_core::ready; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; use pin_project_lite::pin_project; -use core::pin::Pin; -use alloc::collections::VecDeque; pin_project! { /// Sink for the [`buffer`](super::SinkExt::buffer) method. @@ -22,19 +22,12 @@ pin_project! { impl<Si: Sink<Item>, Item> Buffer<Si, Item> { pub(super) fn new(sink: Si, capacity: usize) -> Self { - Self { - sink, - buf: VecDeque::with_capacity(capacity), - capacity, - } + Self { sink, buf: VecDeque::with_capacity(capacity), capacity } } delegate_access_inner!(sink, Si, ()); - fn try_empty_buffer( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Si::Error>> { + fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> { let mut this = self.project(); ready!(this.sink.as_mut().poll_ready(cx))?; while let Some(item) = this.buf.pop_front() { @@ -48,7 +41,10 @@ impl<Si: Sink<Item>, Item> Buffer<Si, Item> { } // Forwarding impl of Stream from the underlying sink -impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream { +impl<S, Item> Stream for Buffer<S, Item> +where + S: Sink<Item> + Stream, +{ type Item = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { @@ -60,7 +56,10 @@ impl<S, Item> Stream for Buffer<S, Item> where S: Sink<Item> + Stream { } } -impl<S, Item> FusedStream for Buffer<S, Item> where S: Sink<Item> + FusedStream { +impl<S, Item> FusedStream for Buffer<S, Item> +where + S: Sink<Item> + FusedStream, +{ fn is_terminated(&self) -> bool { self.sink.is_terminated() } @@ -69,10 +68,7 @@ impl<S, Item> FusedStream for Buffer<S, Item> where S: Sink<Item> + FusedStream impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> { type Error = Si::Error; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { if self.capacity == 0 { return self.project().sink.poll_ready(cx); } @@ -86,10 +82,7 @@ impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> { } } - fn start_send( - self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { if self.capacity == 0 { self.project().sink.start_send(item) } else { @@ -99,20 +92,14 @@ impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> { } #[allow(clippy::debug_assert_with_mut_call)] - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.as_mut().try_empty_buffer(cx))?; debug_assert!(self.buf.is_empty()); self.project().sink.poll_flush(cx) } #[allow(clippy::debug_assert_with_mut_call)] - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.as_mut().try_empty_buffer(cx))?; debug_assert!(self.buf.is_empty()); self.project().sink.poll_close(cx) diff --git a/src/sink/close.rs b/src/sink/close.rs index 4fc99f5..43eea74 100644 --- a/src/sink/close.rs +++ b/src/sink/close.rs @@ -19,20 +19,14 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Close<'_, Si, Item> {} /// The sink itself is returned after closing is complete. impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Close<'a, Si, Item> { pub(super) fn new(sink: &'a mut Si) -> Self { - Self { - sink, - _phantom: PhantomData, - } + Self { sink, _phantom: PhantomData } } } impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Close<'_, Si, Item> { type Output = Result<(), Si::Error>; - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Pin::new(&mut self.sink).poll_close(cx) } } diff --git a/src/sink/drain.rs b/src/sink/drain.rs index 33c5b31..5295115 100644 --- a/src/sink/drain.rs +++ b/src/sink/drain.rs @@ -35,31 +35,19 @@ impl<T> Unpin for Drain<T> {} impl<T> Sink<T> for Drain<T> { type Error = Never; - fn poll_ready( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } - fn start_send( - self: Pin<&mut Self>, - _item: T, - ) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, _item: T) -> Result<(), Self::Error> { Ok(()) } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } - fn poll_close( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } } diff --git a/src/sink/err_into.rs b/src/sink/err_into.rs index 3eb9940..a64d133 100644 --- a/src/sink/err_into.rs +++ b/src/sink/err_into.rs @@ -1,6 +1,6 @@ use crate::sink::{SinkExt, SinkMapErr}; -use futures_core::stream::{Stream, FusedStream}; -use futures_sink::{Sink}; +use futures_core::stream::{FusedStream, Stream}; +use futures_sink::Sink; use pin_project_lite::pin_project; pin_project! { @@ -14,21 +14,21 @@ pin_project! { } impl<Si, E, Item> SinkErrInto<Si, Item, E> - where Si: Sink<Item>, - Si::Error: Into<E>, +where + Si: Sink<Item>, + Si::Error: Into<E>, { pub(super) fn new(sink: Si) -> Self { - Self { - sink: SinkExt::sink_map_err(sink, Into::into), - } + Self { sink: SinkExt::sink_map_err(sink, Into::into) } } delegate_access_inner!(sink, Si, (.)); } impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E> - where Si: Sink<Item>, - Si::Error: Into<E>, +where + Si: Sink<Item>, + Si::Error: Into<E>, { type Error = E; @@ -37,8 +37,9 @@ impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E> // Forwarding impl of Stream from the underlying sink impl<S, Item, E> Stream for SinkErrInto<S, Item, E> - where S: Sink<Item> + Stream, - S::Error: Into<E> +where + S: Sink<Item> + Stream, + S::Error: Into<E>, { type Item = S::Item; @@ -46,8 +47,9 @@ impl<S, Item, E> Stream for SinkErrInto<S, Item, E> } impl<S, Item, E> FusedStream for SinkErrInto<S, Item, E> - where S: Sink<Item> + FusedStream, - S::Error: Into<E> +where + S: Sink<Item> + FusedStream, + S::Error: Into<E>, { fn is_terminated(&self) -> bool { self.sink.is_terminated() diff --git a/src/sink/fanout.rs b/src/sink/fanout.rs index f351e86..fe2038f 100644 --- a/src/sink/fanout.rs +++ b/src/sink/fanout.rs @@ -50,36 +50,32 @@ impl<Si1, Si2> Fanout<Si1, Si2> { impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - f.debug_struct("Fanout") - .field("sink1", &self.sink1) - .field("sink2", &self.sink2) - .finish() + f.debug_struct("Fanout").field("sink1", &self.sink1).field("sink2", &self.sink2).finish() } } impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2> - where Si1: Sink<Item>, - Item: Clone, - Si2: Sink<Item, Error=Si1::Error> +where + Si1: Sink<Item>, + Item: Clone, + Si2: Sink<Item, Error = Si1::Error>, { type Error = Si1::Error; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { let this = self.project(); let sink1_ready = this.sink1.poll_ready(cx)?.is_ready(); let sink2_ready = this.sink2.poll_ready(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; - if ready { Poll::Ready(Ok(())) } else { Poll::Pending } + if ready { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } } - fn start_send( - self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { let this = self.project(); this.sink1.start_send(item.clone())?; @@ -87,27 +83,29 @@ impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2> Ok(()) } - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { let this = self.project(); let sink1_ready = this.sink1.poll_flush(cx)?.is_ready(); let sink2_ready = this.sink2.poll_flush(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; - if ready { Poll::Ready(Ok(())) } else { Poll::Pending } + if ready { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } } - fn poll_close( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { let this = self.project(); let sink1_ready = this.sink1.poll_close(cx)?.is_ready(); let sink2_ready = this.sink2.poll_close(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; - if ready { Poll::Ready(Ok(())) } else { Poll::Pending } + if ready { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } } } diff --git a/src/sink/feed.rs b/src/sink/feed.rs index 06df9a9..6701f7a 100644 --- a/src/sink/feed.rs +++ b/src/sink/feed.rs @@ -17,10 +17,7 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Feed<'_, Si, Item> {} impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Feed<'a, Si, Item> { pub(super) fn new(sink: &'a mut Si, item: Item) -> Self { - Feed { - sink, - item: Some(item), - } + Feed { sink, item: Some(item) } } pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> { @@ -35,10 +32,7 @@ impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Feed<'a, Si, Item> { impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> { type Output = Result<(), Si::Error>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = self.get_mut(); let mut sink = Pin::new(&mut this.sink); ready!(sink.as_mut().poll_ready(cx))?; diff --git a/src/sink/flush.rs b/src/sink/flush.rs index c06a221..35a8372 100644 --- a/src/sink/flush.rs +++ b/src/sink/flush.rs @@ -23,20 +23,14 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Flush<'_, Si, Item> {} /// all current requests are processed. impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Flush<'a, Si, Item> { pub(super) fn new(sink: &'a mut Si) -> Self { - Self { - sink, - _phantom: PhantomData, - } + Self { sink, _phantom: PhantomData } } } impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Flush<'_, Si, Item> { type Output = Result<(), Si::Error>; - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Pin::new(&mut self.sink).poll_flush(cx) } } diff --git a/src/sink/map_err.rs b/src/sink/map_err.rs index 2829344..9d2ab7b 100644 --- a/src/sink/map_err.rs +++ b/src/sink/map_err.rs @@ -1,7 +1,7 @@ use core::pin::Pin; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use futures_sink::{Sink}; +use futures_sink::Sink; use pin_project_lite::pin_project; pin_project! { @@ -28,36 +28,25 @@ impl<Si, F> SinkMapErr<Si, F> { } impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F> - where Si: Sink<Item>, - F: FnOnce(Si::Error) -> E, +where + Si: Sink<Item>, + F: FnOnce(Si::Error) -> E, { type Error = E; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.as_mut().project().sink.poll_ready(cx).map_err(|e| self.as_mut().take_f()(e)) } - fn start_send( - mut self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { self.as_mut().project().sink.start_send(item).map_err(|e| self.as_mut().take_f()(e)) } - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.as_mut().project().sink.poll_flush(cx).map_err(|e| self.as_mut().take_f()(e)) } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.as_mut().project().sink.poll_close(cx).map_err(|e| self.as_mut().take_f()(e)) } } diff --git a/src/sink/mod.rs b/src/sink/mod.rs index e5b515b..147e9ad 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -243,7 +243,8 @@ pub trait SinkExt<Item>: Sink<Item> { /// This future will drive the stream to keep producing items until it is /// exhausted, sending each item to the sink. It will complete once both the /// stream is exhausted, the sink has received all items, and the sink has - /// been flushed. Note that the sink is **not** closed. + /// been flushed. Note that the sink is **not** closed. If the stream produces + /// an error, that error will be returned by this future without flushing the sink. /// /// Doing `sink.send_all(stream)` is roughly equivalent to /// `stream.forward(sink)`. The returned future will exhaust all items from diff --git a/src/sink/send.rs b/src/sink/send.rs index 384c22c..6d21f33 100644 --- a/src/sink/send.rs +++ b/src/sink/send.rs @@ -17,19 +17,14 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Send<'_, Si, Item> {} impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Send<'a, Si, Item> { pub(super) fn new(sink: &'a mut Si, item: Item) -> Self { - Self { - feed: Feed::new(sink, item), - } + Self { feed: Feed::new(sink, item) } } } impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> { type Output = Result<(), Si::Error>; - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = &mut *self; if this.feed.is_item_pending() { diff --git a/src/sink/send_all.rs b/src/sink/send_all.rs index 6a33459..1302dd2 100644 --- a/src/sink/send_all.rs +++ b/src/sink/send_all.rs @@ -1,9 +1,9 @@ -use crate::stream::{StreamExt, TryStreamExt, Fuse}; +use crate::stream::{Fuse, StreamExt, TryStreamExt}; use core::fmt; use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; -use futures_core::stream::{TryStream, Stream}; +use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; @@ -40,22 +40,16 @@ impl<Si, St> Unpin for SendAll<'_, Si, St> where Si: Unpin + ?Sized, St: TryStream + Unpin + ?Sized, -{} +{ +} impl<'a, Si, St, Ok, Error> SendAll<'a, Si, St> where Si: Sink<Ok, Error = Error> + Unpin + ?Sized, St: TryStream<Ok = Ok, Error = Error> + Stream + Unpin + ?Sized, { - pub(super) fn new( - sink: &'a mut Si, - stream: &'a mut St, - ) -> Self { - Self { - sink, - stream: stream.fuse(), - buffered: None, - } + pub(super) fn new(sink: &'a mut Si, stream: &'a mut St) -> Self { + Self { sink, stream: stream.fuse(), buffered: None } } fn try_start_send( @@ -65,9 +59,7 @@ where ) -> Poll<Result<(), Si::Error>> { debug_assert!(self.buffered.is_none()); match Pin::new(&mut self.sink).poll_ready(cx)? { - Poll::Ready(()) => { - Poll::Ready(Pin::new(&mut self.sink).start_send(item)) - } + Poll::Ready(()) => Poll::Ready(Pin::new(&mut self.sink).start_send(item)), Poll::Pending => { self.buffered = Some(item); Poll::Pending @@ -83,10 +75,7 @@ where { type Output = Result<(), Error>; - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = &mut *self; // If we've got an item buffered already, we need to write it to the // sink before we can do anything else @@ -96,16 +85,14 @@ where loop { match this.stream.try_poll_next_unpin(cx)? { - Poll::Ready(Some(item)) => { - ready!(this.try_start_send(cx, item))? - } + Poll::Ready(Some(item)) => ready!(this.try_start_send(cx, item))?, Poll::Ready(None) => { ready!(Pin::new(&mut this.sink).poll_flush(cx))?; - return Poll::Ready(Ok(())) + return Poll::Ready(Ok(())); } Poll::Pending => { ready!(Pin::new(&mut this.sink).poll_flush(cx))?; - return Poll::Pending + return Poll::Pending; } } } diff --git a/src/sink/unfold.rs b/src/sink/unfold.rs index 3903716..330a068 100644 --- a/src/sink/unfold.rs +++ b/src/sink/unfold.rs @@ -41,10 +41,7 @@ where F: FnMut(T, Item) -> R, R: Future<Output = Result<T, E>>, { - assert_sink::<Item, E, _>(Unfold { - function, - state: UnfoldState::Value { value: init }, - }) + assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } }) } impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R> diff --git a/src/sink/with.rs b/src/sink/with.rs index 73b87b7..86d3dcc 100644 --- a/src/sink/with.rs +++ b/src/sink/with.rs @@ -27,29 +27,22 @@ where Fut: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("With") - .field("sink", &self.sink) - .field("state", &self.state) - .finish() + f.debug_struct("With").field("sink", &self.sink).field("state", &self.state).finish() } } impl<Si, Item, U, Fut, F> With<Si, Item, U, Fut, F> -where Si: Sink<Item>, - F: FnMut(U) -> Fut, - Fut: Future, +where + Si: Sink<Item>, + F: FnMut(U) -> Fut, + Fut: Future, { pub(super) fn new<E>(sink: Si, f: F) -> Self - where - Fut: Future<Output = Result<Item, E>>, - E: From<Si::Error>, + where + Fut: Future<Output = Result<Item, E>>, + E: From<Si::Error>, { - Self { - state: None, - sink, - f, - _phantom: PhantomData, - } + Self { state: None, sink, f, _phantom: PhantomData } } } @@ -71,9 +64,10 @@ where // Forwarding impl of Stream from the underlying sink impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F> - where S: Stream + Sink<Item>, - F: FnMut(U) -> Fut, - Fut: Future +where + S: Stream + Sink<Item>, + F: FnMut(U) -> Fut, + Fut: Future, { type Item = S::Item; @@ -81,18 +75,16 @@ impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F> } impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F> - where Si: Sink<Item>, - F: FnMut(U) -> Fut, - Fut: Future<Output = Result<Item, E>>, - E: From<Si::Error>, +where + Si: Sink<Item>, + F: FnMut(U) -> Fut, + Fut: Future<Output = Result<Item, E>>, + E: From<Si::Error>, { delegate_access_inner!(sink, Si, ()); /// Completes the processing of previous item if any. - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), E>> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), E>> { let mut this = self.project(); let item = match this.state.as_mut().as_pin_mut() { @@ -106,26 +98,21 @@ impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F> } impl<Si, Item, U, Fut, F, E> Sink<U> for With<Si, Item, U, Fut, F> - where Si: Sink<Item>, - F: FnMut(U) -> Fut, - Fut: Future<Output = Result<Item, E>>, - E: From<Si::Error>, +where + Si: Sink<Item>, + F: FnMut(U) -> Fut, + Fut: Future<Output = Result<Item, E>>, + E: From<Si::Error>, { type Error = E; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.as_mut().poll(cx))?; ready!(self.project().sink.poll_ready(cx)?); Poll::Ready(Ok(())) } - fn start_send( - self: Pin<&mut Self>, - item: U, - ) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> { let mut this = self.project(); assert!(this.state.is_none()); @@ -133,19 +120,13 @@ impl<Si, Item, U, Fut, F, E> Sink<U> for With<Si, Item, U, Fut, F> Ok(()) } - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.as_mut().poll(cx))?; ready!(self.project().sink.poll_flush(cx)?); Poll::Ready(Ok(())) } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.as_mut().poll(cx))?; ready!(self.project().sink.poll_close(cx)?); Poll::Ready(Ok(())) diff --git a/src/sink/with_flat_map.rs b/src/sink/with_flat_map.rs index 4b8d3a2..2ae877a 100644 --- a/src/sink/with_flat_map.rs +++ b/src/sink/with_flat_map.rs @@ -2,7 +2,7 @@ use core::fmt; use core::marker::PhantomData; use core::pin::Pin; use futures_core::ready; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; use pin_project_lite::pin_project; @@ -43,21 +43,12 @@ where St: Stream<Item = Result<Item, Si::Error>>, { pub(super) fn new(sink: Si, f: F) -> Self { - Self { - sink, - f, - stream: None, - buffer: None, - _marker: PhantomData, - } + Self { sink, f, stream: None, buffer: None, _marker: PhantomData } } delegate_access_inner!(sink, Si, ()); - fn try_empty_stream( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Si::Error>> { + fn try_empty_stream(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> { let mut this = self.project(); if this.buffer.is_some() { @@ -112,17 +103,11 @@ where { type Error = Si::Error; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.try_empty_stream(cx) } - fn start_send( - self: Pin<&mut Self>, - item: U, - ) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> { let mut this = self.project(); assert!(this.stream.is_none()); @@ -130,18 +115,12 @@ where Ok(()) } - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.as_mut().try_empty_stream(cx)?); self.project().sink.poll_flush(cx) } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ready!(self.as_mut().try_empty_stream(cx)?); self.project().sink.poll_close(cx) } diff --git a/src/stream/abortable.rs b/src/stream/abortable.rs new file mode 100644 index 0000000..1fea895 --- /dev/null +++ b/src/stream/abortable.rs @@ -0,0 +1,19 @@ +use super::assert_stream; +use crate::stream::{AbortHandle, Abortable}; +use crate::Stream; + +/// Creates a new `Abortable` stream and an `AbortHandle` which can be used to stop it. +/// +/// This function is a convenient (but less flexible) alternative to calling +/// `AbortHandle::new` and `Abortable::new` manually. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +pub fn abortable<St>(stream: St) -> (Abortable<St>, AbortHandle) +where + St: Stream, +{ + let (handle, reg) = AbortHandle::new_pair(); + let abortable = assert_stream::<St::Item, _>(Abortable::new(stream, reg)); + (abortable, handle) +} diff --git a/src/stream/empty.rs b/src/stream/empty.rs index c629a4b..e4fd873 100644 --- a/src/stream/empty.rs +++ b/src/stream/empty.rs @@ -8,16 +8,14 @@ use futures_core::task::{Context, Poll}; #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Empty<T> { - _phantom: PhantomData<T> + _phantom: PhantomData<T>, } /// Creates a stream which contains no elements. /// /// The returned stream will always return `Ready(None)` when polled. pub fn empty<T>() -> Empty<T> { - assert_stream::<T, _>(Empty { - _phantom: PhantomData - }) + assert_stream::<T, _>(Empty { _phantom: PhantomData }) } impl<T> Unpin for Empty<T> {} diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs index eda3b27..f596b3b 100644 --- a/src/stream/futures_ordered.rs +++ b/src/stream/futures_ordered.rs @@ -52,10 +52,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let index = self.index; - self.project().data.poll(cx).map(|output| OrderWrapper { - data: output, - index, - }) + self.project().data.poll(cx).map(|output| OrderWrapper { data: output, index }) } } @@ -139,10 +136,7 @@ impl<Fut: Future> FuturesOrdered<Fut> { /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. pub fn push(&mut self, future: Fut) { - let wrapped = OrderWrapper { - data: future, - index: self.next_incoming_index, - }; + let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; self.in_progress_queue.push(wrapped); } diff --git a/src/stream/futures_unordered/iter.rs b/src/stream/futures_unordered/iter.rs index ef7b15a..04db5ee 100644 --- a/src/stream/futures_unordered/iter.rs +++ b/src/stream/futures_unordered/iter.rs @@ -1,41 +1,83 @@ -use super::FuturesUnordered; use super::task::Task; +use super::FuturesUnordered; use core::marker::PhantomData; use core::pin::Pin; use core::sync::atomic::Ordering::Relaxed; -#[derive(Debug)] /// Mutable iterator over all futures in the unordered set. +#[derive(Debug)] pub struct IterPinMut<'a, Fut> { pub(super) task: *const Task<Fut>, pub(super) len: usize, - pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>> + pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>, } -#[derive(Debug)] /// Mutable iterator over all futures in the unordered set. -pub struct IterMut<'a, Fut: Unpin> (pub(super) IterPinMut<'a, Fut>); - #[derive(Debug)] +pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); + /// Immutable iterator over all futures in the unordered set. +#[derive(Debug)] pub struct IterPinRef<'a, Fut> { pub(super) task: *const Task<Fut>, pub(super) len: usize, pub(super) pending_next_all: *mut Task<Fut>, - pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>> + pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>, } -#[derive(Debug)] /// Immutable iterator over all the futures in the unordered set. -pub struct Iter<'a, Fut: Unpin> (pub(super) IterPinRef<'a, Fut>); +#[derive(Debug)] +pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); + +/// Owned iterator over all futures in the unordered set. +#[derive(Debug)] +pub struct IntoIter<Fut: Unpin> { + pub(super) len: usize, + pub(super) inner: FuturesUnordered<Fut>, +} + +impl<Fut: Unpin> Iterator for IntoIter<Fut> { + type Item = Fut; + + fn next(&mut self) -> Option<Self::Item> { + // `head_all` can be accessed directly and we don't need to spin on + // `Task::next_all` since we have exclusive access to the set. + let task = self.inner.head_all.get_mut(); + + if (*task).is_null() { + return None; + } + + unsafe { + // Moving out of the future is safe because it is `Unpin` + let future = (*(**task).future.get()).take().unwrap(); + + // Mutable access to a previously shared `FuturesUnordered` implies + // that the other threads already released the object before the + // current thread acquired it, so relaxed ordering can be used and + // valid `next_all` checks can be skipped. + let next = (**task).next_all.load(Relaxed); + *task = next; + self.len -= 1; + Some(future) + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (self.len, Some(self.len)) + } +} + +impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {} impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { type Item = Pin<&'a mut Fut>; - fn next(&mut self) -> Option<Pin<&'a mut Fut>> { + fn next(&mut self) -> Option<Self::Item> { if self.task.is_null() { return None; } + unsafe { let future = (*(*self.task).future.get()).as_mut().unwrap(); @@ -60,7 +102,7 @@ impl<Fut> ExactSizeIterator for IterPinMut<'_, Fut> {} impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { type Item = &'a mut Fut; - fn next(&mut self) -> Option<&'a mut Fut> { + fn next(&mut self) -> Option<Self::Item> { self.0.next().map(Pin::get_mut) } @@ -74,10 +116,11 @@ impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {} impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { type Item = Pin<&'a Fut>; - fn next(&mut self) -> Option<Pin<&'a Fut>> { + fn next(&mut self) -> Option<Self::Item> { if self.task.is_null() { return None; } + unsafe { let future = (*(*self.task).future.get()).as_ref().unwrap(); @@ -85,10 +128,7 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { // `head_all` was initially read for this iterator implies acquire // ordering for all previously inserted nodes (and we don't need to // read `len_all` again for any other nodes). - let next = (*self.task).spin_next_all( - self.pending_next_all, - Relaxed, - ); + let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed); self.task = next; self.len -= 1; Some(Pin::new_unchecked(future)) @@ -105,7 +145,7 @@ impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {} impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { type Item = &'a Fut; - fn next(&mut self) -> Option<&'a Fut> { + fn next(&mut self) -> Option<Self::Item> { self.0.next().map(Pin::get_ref) } @@ -115,3 +155,14 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { } impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {} + +// SAFETY: we do nothing thread-local and there is no interior mutability, +// so the usual structural `Send`/`Sync` apply. +unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {} +unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {} + +unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {} +unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {} + +unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {} +unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {} diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index 8dcc551..a25fbe0 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs @@ -3,11 +3,8 @@ //! This module is only available when the `std` or `alloc` feature of this //! library is activated, and it is activated by default. -use futures_core::future::Future; -use futures_core::stream::{FusedStream, Stream}; -use futures_core::task::{Context, Poll}; -use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError}; use crate::task::AtomicWaker; +use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; use core::fmt::{self, Debug}; use core::iter::FromIterator; @@ -16,20 +13,22 @@ use core::mem; use core::pin::Pin; use core::ptr; use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; -use core::sync::atomic::{AtomicPtr, AtomicBool}; -use alloc::sync::{Arc, Weak}; +use core::sync::atomic::{AtomicBool, AtomicPtr}; +use futures_core::future::Future; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; mod iter; -pub use self::iter::{Iter, IterMut, IterPinMut, IterPinRef}; +pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; mod task; use self::task::Task; mod ready_to_run_queue; -use self::ready_to_run_queue::{ReadyToRunQueue, Dequeue}; - +use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue}; /// A set of futures which may complete in any order. /// @@ -63,18 +62,14 @@ unsafe impl<Fut: Sync> Sync for FuturesUnordered<Fut> {} impl<Fut> Unpin for FuturesUnordered<Fut> {} impl Spawn for FuturesUnordered<FutureObj<'_, ()>> { - fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) - -> Result<(), SpawnError> - { + fn spawn_obj(&self, future_obj: FutureObj<'static, ()>) -> Result<(), SpawnError> { self.push(future_obj); Ok(()) } } impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> { - fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) - -> Result<(), SpawnError> - { + fn spawn_local_obj(&self, future_obj: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { self.push(future_obj); Ok(()) } @@ -191,24 +186,26 @@ impl<Fut> FuturesUnordered<Fut> { } /// Returns an iterator that allows inspecting each future in the set. - pub fn iter(&self) -> Iter<'_, Fut> where Fut: Unpin { + pub fn iter(&self) -> Iter<'_, Fut> + where + Fut: Unpin, + { Iter(Pin::new(self).iter_pin_ref()) } /// Returns an iterator that allows inspecting each future in the set. - fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { + pub fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> { let (task, len) = self.atomic_load_head_and_len_all(); + let pending_next_all = self.pending_next_all(); - IterPinRef { - task, - len, - pending_next_all: self.pending_next_all(), - _marker: PhantomData, - } + IterPinRef { task, len, pending_next_all, _marker: PhantomData } } /// Returns an iterator that allows modifying each future in the set. - pub fn iter_mut(&mut self) -> IterMut<'_, Fut> where Fut: Unpin { + pub fn iter_mut(&mut self) -> IterMut<'_, Fut> + where + Fut: Unpin, + { IterMut(Pin::new(self).iter_pin_mut()) } @@ -217,19 +214,9 @@ impl<Fut> FuturesUnordered<Fut> { // `head_all` can be accessed directly and we don't need to spin on // `Task::next_all` since we have exclusive access to the set. let task = *self.head_all.get_mut(); - let len = if task.is_null() { - 0 - } else { - unsafe { - *(*task).len_all.get() - } - }; + let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } }; - IterPinMut { - task, - len, - _marker: PhantomData - } + IterPinMut { task, len, _marker: PhantomData } } /// Returns the current head node and number of futures in the list of all @@ -249,7 +236,7 @@ impl<Fut> FuturesUnordered<Fut> { (task, len) } - /// Releases the task. It destorys the future inside and either drops + /// Releases the task. It destroys the future inside and either drops /// the `Arc<Task>` or transfers ownership to the ready to run queue. /// The task this method is called on must have been unlinked before. fn release_task(&mut self, task: Arc<Task<Fut>>) { @@ -395,9 +382,7 @@ impl<Fut> FuturesUnordered<Fut> { impl<Fut: Future> Stream for FuturesUnordered<Fut> { type Item = Fut::Output; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll<Option<Self::Item>> - { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // Variable to determine how many times it is allowed to poll underlying // futures without yielding. // @@ -469,14 +454,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { // Double check that the call to `release_task` really // happened. Calling it required the task to be unlinked. - debug_assert_eq!( - task.next_all.load(Relaxed), - self.pending_next_all() - ); + debug_assert_eq!(task.next_all.load(Relaxed), self.pending_next_all()); unsafe { debug_assert!((*task.prev_all.get()).is_null()); } - continue + continue; } }; @@ -516,10 +498,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { } } - let mut bomb = Bomb { - task: Some(task), - queue: &mut *self, - }; + let mut bomb = Bomb { task: Some(task), queue: &mut *self }; // Poll the underlying future with the appropriate waker // implementation. This is where a large bit of the unsafety @@ -555,11 +534,9 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { cx.waker().wake_by_ref(); return Poll::Pending; } - continue - } - Poll::Ready(output) => { - return Poll::Ready(Some(output)) + continue; } + Poll::Ready(output) => return Poll::Ready(Some(output)), } } } @@ -576,19 +553,33 @@ impl<Fut> Debug for FuturesUnordered<Fut> { } } +impl<Fut> FuturesUnordered<Fut> { + /// Clears the set, removing all futures. + pub fn clear(&mut self) { + self.clear_head_all(); + + // we just cleared all the tasks, and we have &mut self, so this is safe. + unsafe { self.ready_to_run_queue.clear() }; + + self.is_terminated.store(false, Relaxed); + } + + fn clear_head_all(&mut self) { + while !self.head_all.get_mut().is_null() { + let head = *self.head_all.get_mut(); + let task = unsafe { self.unlink(head) }; + self.release_task(task); + } + } +} + impl<Fut> Drop for FuturesUnordered<Fut> { fn drop(&mut self) { // When a `FuturesUnordered` is dropped we want to drop all futures // associated with it. At the same time though there may be tons of // wakers flying around which contain `Task<Fut>` references // inside them. We'll let those naturally get deallocated. - unsafe { - while !self.head_all.get_mut().is_null() { - let head = *self.head_all.get_mut(); - let task = self.unlink(head); - self.release_task(task); - } - } + self.clear_head_all(); // Note that at this point we could still have a bunch of tasks in the // ready to run queue. None of those tasks, however, have futures @@ -605,13 +596,48 @@ impl<Fut> Drop for FuturesUnordered<Fut> { } } +impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered<Fut> { + type Item = &'a Fut; + type IntoIter = Iter<'a, Fut>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<'a, Fut: Unpin> IntoIterator for &'a mut FuturesUnordered<Fut> { + type Item = &'a mut Fut; + type IntoIter = IterMut<'a, Fut>; + + fn into_iter(self) -> Self::IntoIter { + self.iter_mut() + } +} + +impl<Fut: Unpin> IntoIterator for FuturesUnordered<Fut> { + type Item = Fut; + type IntoIter = IntoIter<Fut>; + + fn into_iter(mut self) -> Self::IntoIter { + // `head_all` can be accessed directly and we don't need to spin on + // `Task::next_all` since we have exclusive access to the set. + let task = *self.head_all.get_mut(); + let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } }; + + IntoIter { len, inner: self } + } +} + impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> { fn from_iter<I>(iter: I) -> Self where I: IntoIterator<Item = Fut>, { let acc = Self::new(); - iter.into_iter().fold(acc, |acc, item| { acc.push(item); acc }) + iter.into_iter().fold(acc, |acc, item| { + acc.push(item); + acc + }) } } diff --git a/src/stream/futures_unordered/ready_to_run_queue.rs b/src/stream/futures_unordered/ready_to_run_queue.rs index 2105195..5ef6cde 100644 --- a/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/src/stream/futures_unordered/ready_to_run_queue.rs @@ -1,9 +1,9 @@ use crate::task::AtomicWaker; +use alloc::sync::Arc; use core::cell::UnsafeCell; use core::ptr; use core::sync::atomic::AtomicPtr; -use core::sync::atomic::Ordering::{Relaxed, Acquire, Release, AcqRel}; -use alloc::sync::Arc; +use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; use super::abort::abort; use super::task::Task; @@ -85,25 +85,38 @@ impl<Fut> ReadyToRunQueue<Fut> { pub(super) fn stub(&self) -> *const Task<Fut> { &*self.stub } + + // Clear the queue of tasks. + // + // Note that each task has a strong reference count associated with it + // which is owned by the ready to run queue. This method just pulls out + // tasks and drops their refcounts. + // + // # Safety + // + // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear) + // - The caller **must** guarantee unique access to `self` + pub(crate) unsafe fn clear(&self) { + loop { + // SAFETY: We have the guarantee of mutual exclusion required by `dequeue`. + match self.dequeue() { + Dequeue::Empty => break, + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), + } + } + } } impl<Fut> Drop for ReadyToRunQueue<Fut> { fn drop(&mut self) { // Once we're in the destructor for `Inner<Fut>` we need to clear out // the ready to run queue of tasks if there's anything left in there. - // - // Note that each task has a strong reference count associated with it - // which is owned by the ready to run queue. All tasks should have had - // their futures dropped already by the `FuturesUnordered` destructor - // above, so we're just pulling out tasks and dropping their refcounts. + + // All tasks have had their futures dropped already by the `FuturesUnordered` + // destructor above, and we have &mut self, so this is safe. unsafe { - loop { - match self.dequeue() { - Dequeue::Empty => break, - Dequeue::Inconsistent => abort("inconsistent in drop"), - Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), - } - } + self.clear(); } } } diff --git a/src/stream/futures_unordered/task.rs b/src/stream/futures_unordered/task.rs index 261408f..da2cd67 100644 --- a/src/stream/futures_unordered/task.rs +++ b/src/stream/futures_unordered/task.rs @@ -1,11 +1,11 @@ +use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; -use core::sync::atomic::{AtomicPtr, AtomicBool}; use core::sync::atomic::Ordering::{self, SeqCst}; -use alloc::sync::{Arc, Weak}; +use core::sync::atomic::{AtomicBool, AtomicPtr}; -use crate::task::{ArcWake, WakerRef, waker_ref}; -use super::ReadyToRunQueue; use super::abort::abort; +use super::ReadyToRunQueue; +use crate::task::{waker_ref, ArcWake, WakerRef}; pub(super) struct Task<Fut> { // The future diff --git a/src/stream/iter.rs b/src/stream/iter.rs index 033dae1..20471c2 100644 --- a/src/stream/iter.rs +++ b/src/stream/iter.rs @@ -27,15 +27,15 @@ impl<I> Unpin for Iter<I> {} /// # }); /// ``` pub fn iter<I>(i: I) -> Iter<I::IntoIter> - where I: IntoIterator, +where + I: IntoIterator, { - assert_stream::<I::Item, _>(Iter { - iter: i.into_iter(), - }) + assert_stream::<I::Item, _>(Iter { iter: i.into_iter() }) } impl<I> Stream for Iter<I> - where I: Iterator, +where + I: Iterator, { type Item = I::Item; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f3b2baa..0b2fc90 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -19,8 +19,8 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; mod stream; pub use self::stream::{ Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, - Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt, - StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, + Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip, + SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, }; #[cfg(feature = "std")] @@ -36,11 +36,11 @@ pub use self::stream::ReadyChunks; #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub use self::stream::Forward; -#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] +#[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent}; -#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] +#[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] #[cfg(feature = "alloc")] @@ -58,7 +58,7 @@ pub use self::try_stream::{ #[cfg(feature = "std")] pub use self::try_stream::IntoAsyncRead; -#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] +#[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent}; @@ -104,9 +104,16 @@ cfg_target_has_atomic! { pub use self::futures_unordered::FuturesUnordered; #[cfg(feature = "alloc")] - mod select_all; + pub mod select_all; #[cfg(feature = "alloc")] pub use self::select_all::{select_all, SelectAll}; + + #[cfg(feature = "alloc")] + mod abortable; + #[cfg(feature = "alloc")] + pub use crate::abortable::{Abortable, AbortHandle, AbortRegistration, Aborted}; + #[cfg(feature = "alloc")] + pub use abortable::abortable; } // Just a helper function to ensure the streams we're returning all have the diff --git a/src/stream/once.rs b/src/stream/once.rs index e16fe00..ee21c8b 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -2,7 +2,7 @@ use super::assert_stream; use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index cf9f21b..3f9aa87 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -1,6 +1,6 @@ use super::assert_stream; use core::pin::Pin; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; /// Stream for the [`repeat`] function. @@ -25,7 +25,8 @@ pub struct Repeat<T> { /// # }); /// ``` pub fn repeat<T>(item: T) -> Repeat<T> - where T: Clone +where + T: Clone, { assert_stream::<T, _>(Repeat { item }) } @@ -33,7 +34,8 @@ pub fn repeat<T>(item: T) -> Repeat<T> impl<T> Unpin for Repeat<T> {} impl<T> Stream for Repeat<T> - where T: Clone +where + T: Clone, { type Item = T; @@ -47,7 +49,8 @@ impl<T> Stream for Repeat<T> } impl<T> FusedStream for Repeat<T> - where T: Clone, +where + T: Clone, { fn is_terminated(&self) -> bool { false diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs index 0255643..f5a81b4 100644 --- a/src/stream/repeat_with.rs +++ b/src/stream/repeat_with.rs @@ -1,6 +1,6 @@ use super::assert_stream; use core::pin::Pin; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; /// An stream that repeats elements of type `A` endlessly by @@ -28,8 +28,7 @@ impl<A, F: FnMut() -> A> Stream for RepeatWith<F> { } } -impl<A, F: FnMut() -> A> FusedStream for RepeatWith<F> -{ +impl<A, F: FnMut() -> A> FusedStream for RepeatWith<F> { fn is_terminated(&self) -> bool { false } diff --git a/src/stream/select.rs b/src/stream/select.rs index 2942494..133ac6c 100644 --- a/src/stream/select.rs +++ b/src/stream/select.rs @@ -1,5 +1,5 @@ use super::assert_stream; -use crate::stream::{StreamExt, Fuse}; +use crate::stream::{Fuse, StreamExt}; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -29,8 +29,9 @@ pin_project! { /// Note that this function consumes both streams and returns a wrapped /// version of them. pub fn select<St1, St2>(stream1: St1, stream2: St2) -> Select<St1, St2> - where St1: Stream, - St2: Stream<Item = St1::Item> +where + St1: Stream, + St2: Stream<Item = St1::Item>, { assert_stream::<St1::Item, _>(Select { stream1: stream1.fuse(), @@ -75,8 +76,9 @@ impl<St1, St2> Select<St1, St2> { } impl<St1, St2> FusedStream for Select<St1, St2> - where St1: Stream, - St2: Stream<Item = St1::Item> +where + St1: Stream, + St2: Stream<Item = St1::Item>, { fn is_terminated(&self) -> bool { self.stream1.is_terminated() && self.stream2.is_terminated() @@ -84,15 +86,13 @@ impl<St1, St2> FusedStream for Select<St1, St2> } impl<St1, St2> Stream for Select<St1, St2> - where St1: Stream, - St2: Stream<Item = St1::Item> +where + St1: Stream, + St2: Stream<Item = St1::Item>, { type Item = St1::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<St1::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> { let this = self.project(); if !*this.flag { poll_inner(this.flag, this.stream1, this.stream2, cx) @@ -106,24 +106,24 @@ fn poll_inner<St1, St2>( flag: &mut bool, a: Pin<&mut St1>, b: Pin<&mut St2>, - cx: &mut Context<'_> + cx: &mut Context<'_>, ) -> Poll<Option<St1::Item>> - where St1: Stream, St2: Stream<Item = St1::Item> +where + St1: Stream, + St2: Stream<Item = St1::Item>, { let a_done = match a.poll_next(cx) { Poll::Ready(Some(item)) => { // give the other stream a chance to go first next time *flag = !*flag; - return Poll::Ready(Some(item)) - }, + return Poll::Ready(Some(item)); + } Poll::Ready(None) => true, Poll::Pending => false, }; match b.poll_next(cx) { - Poll::Ready(Some(item)) => { - Poll::Ready(Some(item)) - } + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), Poll::Ready(None) if a_done => Poll::Ready(None), Poll::Ready(None) | Poll::Pending => Poll::Pending, } diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs index c0b92fa..3474331 100644 --- a/src/stream/select_all.rs +++ b/src/stream/select_all.rs @@ -5,27 +5,32 @@ use core::iter::FromIterator; use core::pin::Pin; use futures_core::ready; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + use super::assert_stream; -use crate::stream::{StreamExt, StreamFuture, FuturesUnordered}; +use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture}; -/// An unbounded set of streams -/// -/// This "combinator" provides the ability to maintain a set of streams -/// and drive them all to completion. -/// -/// Streams are pushed into this set and their realized values are -/// yielded as they become ready. Streams will only be polled when they -/// generate notifications. This allows to coordinate a large number of streams. -/// -/// Note that you can create a ready-made `SelectAll` via the -/// `select_all` function in the `stream` module, or you can start with an -/// empty set with the `SelectAll::new` constructor. -#[must_use = "streams do nothing unless polled"] -pub struct SelectAll<St> { - inner: FuturesUnordered<StreamFuture<St>>, +pin_project! { + /// An unbounded set of streams + /// + /// This "combinator" provides the ability to maintain a set of streams + /// and drive them all to completion. + /// + /// Streams are pushed into this set and their realized values are + /// yielded as they become ready. Streams will only be polled when they + /// generate notifications. This allows to coordinate a large number of streams. + /// + /// Note that you can create a ready-made `SelectAll` via the + /// `select_all` function in the `stream` module, or you can start with an + /// empty set with the `SelectAll::new` constructor. + #[must_use = "streams do nothing unless polled"] + pub struct SelectAll<St> { + #[pin] + inner: FuturesUnordered<StreamFuture<St>>, + } } impl<St: Debug> Debug for SelectAll<St> { @@ -64,6 +69,21 @@ impl<St: Stream + Unpin> SelectAll<St> { pub fn push(&mut self, stream: St) { self.inner.push(stream.into_future()); } + + /// Returns an iterator that allows inspecting each stream in the set. + pub fn iter(&self) -> Iter<'_, St> { + Iter(self.inner.iter()) + } + + /// Returns an iterator that allows modifying each stream in the set. + pub fn iter_mut(&mut self) -> IterMut<'_, St> { + IterMut(self.inner.iter_mut()) + } + + /// Clears the set, removing all streams. + pub fn clear(&mut self) { + self.inner.clear() + } } impl<St: Stream + Unpin> Default for SelectAll<St> { @@ -75,10 +95,7 @@ impl<St: Stream + Unpin> Default for SelectAll<St> { impl<St: Stream + Unpin> Stream for SelectAll<St> { type Item = St::Item; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { loop { match ready!(self.inner.poll_next_unpin(cx)) { Some((Some(item), remaining)) => { @@ -111,13 +128,14 @@ impl<St: Stream + Unpin> FusedStream for SelectAll<St> { /// streams internally, in the order they become available. /// /// Note that the returned set can also be used to dynamically push more -/// futures into the set as they become available. +/// streams into the set as they become available. /// /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. pub fn select_all<I>(streams: I) -> SelectAll<I::Item> - where I: IntoIterator, - I::Item: Stream + Unpin +where + I: IntoIterator, + I::Item: Stream + Unpin, { let mut set = SelectAll::new(); @@ -141,3 +159,96 @@ impl<St: Stream + Unpin> Extend<St> for SelectAll<St> { } } } + +impl<St: Stream + Unpin> IntoIterator for SelectAll<St> { + type Item = St; + type IntoIter = IntoIter<St>; + + fn into_iter(self) -> Self::IntoIter { + IntoIter(self.inner.into_iter()) + } +} + +impl<'a, St: Stream + Unpin> IntoIterator for &'a SelectAll<St> { + type Item = &'a St; + type IntoIter = Iter<'a, St>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<'a, St: Stream + Unpin> IntoIterator for &'a mut SelectAll<St> { + type Item = &'a mut St; + type IntoIter = IterMut<'a, St>; + + fn into_iter(self) -> Self::IntoIter { + self.iter_mut() + } +} + +/// Immutable iterator over all streams in the unordered set. +#[derive(Debug)] +pub struct Iter<'a, St: Unpin>(futures_unordered::Iter<'a, StreamFuture<St>>); + +/// Mutable iterator over all streams in the unordered set. +#[derive(Debug)] +pub struct IterMut<'a, St: Unpin>(futures_unordered::IterMut<'a, StreamFuture<St>>); + +/// Owned iterator over all streams in the unordered set. +#[derive(Debug)] +pub struct IntoIter<St: Unpin>(futures_unordered::IntoIter<StreamFuture<St>>); + +impl<'a, St: Stream + Unpin> Iterator for Iter<'a, St> { + type Item = &'a St; + + fn next(&mut self) -> Option<Self::Item> { + let st = self.0.next()?; + let next = st.get_ref(); + // This should always be true because FuturesUnordered removes completed futures. + debug_assert!(next.is_some()); + next + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.0.size_hint() + } +} + +impl<St: Stream + Unpin> ExactSizeIterator for Iter<'_, St> {} + +impl<'a, St: Stream + Unpin> Iterator for IterMut<'a, St> { + type Item = &'a mut St; + + fn next(&mut self) -> Option<Self::Item> { + let st = self.0.next()?; + let next = st.get_mut(); + // This should always be true because FuturesUnordered removes completed futures. + debug_assert!(next.is_some()); + next + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.0.size_hint() + } +} + +impl<St: Stream + Unpin> ExactSizeIterator for IterMut<'_, St> {} + +impl<St: Stream + Unpin> Iterator for IntoIter<St> { + type Item = St; + + fn next(&mut self) -> Option<Self::Item> { + let st = self.0.next()?; + let next = st.into_inner(); + // This should always be true because FuturesUnordered removes completed futures. + debug_assert!(next.is_some()); + next + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.0.size_hint() + } +} + +impl<St: Stream + Unpin> ExactSizeIterator for IntoIter<St> {} diff --git a/src/stream/stream/buffer_unordered.rs b/src/stream/stream/buffer_unordered.rs index de42cfd..d64c142 100644 --- a/src/stream/stream/buffer_unordered.rs +++ b/src/stream/stream/buffer_unordered.rs @@ -1,12 +1,12 @@ use crate::stream::{Fuse, FuturesUnordered, StreamExt}; +use core::fmt; +use core::pin::Pin; use futures_core::future::Future; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::fmt; -use core::pin::Pin; pin_project! { /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) @@ -63,10 +63,7 @@ where { type Item = <St::Item as Future>::Output; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs index 1af9f49..6052a73 100644 --- a/src/stream/stream/buffered.rs +++ b/src/stream/stream/buffered.rs @@ -1,4 +1,6 @@ use crate::stream::{Fuse, FuturesOrdered, StreamExt}; +use core::fmt; +use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; use futures_core::stream::Stream; @@ -6,8 +8,6 @@ use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::fmt; -use core::pin::Pin; pin_project! { /// Stream for the [`buffered`](super::StreamExt::buffered) method. @@ -44,11 +44,7 @@ where St::Item: Future, { pub(super) fn new(stream: St, n: usize) -> Self { - Self { - stream: super::Fuse::new(stream), - in_progress_queue: FuturesOrdered::new(), - max: n, - } + Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesOrdered::new(), max: n } } delegate_access_inner!(stream, St, (.)); @@ -61,10 +57,7 @@ where { type Item = <St::Item as Future>::Output; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up @@ -79,7 +72,7 @@ where // Attempt to pull the next value from the in_progress_queue let res = this.in_progress_queue.poll_next_unpin(cx); if let Some(val) = ready!(res) { - return Poll::Ready(Some(val)) + return Poll::Ready(Some(val)); } // If more values are still coming from the stream, we're not done yet diff --git a/src/stream/stream/catch_unwind.rs b/src/stream/stream/catch_unwind.rs index d87a40a..09a6dc1 100644 --- a/src/stream/stream/catch_unwind.rs +++ b/src/stream/stream/catch_unwind.rs @@ -1,9 +1,9 @@ -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; use std::any::Any; +use std::panic::{catch_unwind, AssertUnwindSafe, UnwindSafe}; use std::pin::Pin; -use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; pin_project! { /// Stream for the [`catch_unwind`](super::StreamExt::catch_unwind) method. @@ -27,25 +27,20 @@ impl<St: Stream + UnwindSafe> CatchUnwind<St> { impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> { type Item = Result<St::Item, Box<dyn Any + Send>>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); if *this.caught_unwind { Poll::Ready(None) } else { - let res = catch_unwind(AssertUnwindSafe(|| { - this.stream.as_mut().poll_next(cx) - })); + let res = catch_unwind(AssertUnwindSafe(|| this.stream.as_mut().poll_next(cx))); match res { Ok(poll) => poll.map(|opt| opt.map(Ok)), Err(e) => { *this.caught_unwind = true; Poll::Ready(Some(Err(e))) - }, + } } } } diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index 2be7104..c5da35e 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -18,20 +18,19 @@ pin_project! { // All interactions with `Pin<&mut Chain<..>>` happen through these methods impl<St1, St2> Chain<St1, St2> -where St1: Stream, - St2: Stream<Item = St1::Item>, +where + St1: Stream, + St2: Stream<Item = St1::Item>, { pub(super) fn new(stream1: St1, stream2: St2) -> Self { - Self { - first: Some(stream1), - second: stream2, - } + Self { first: Some(stream1), second: stream2 } } } impl<St1, St2> FusedStream for Chain<St1, St2> -where St1: Stream, - St2: FusedStream<Item=St1::Item>, +where + St1: Stream, + St2: FusedStream<Item = St1::Item>, { fn is_terminated(&self) -> bool { self.first.is_none() && self.second.is_terminated() @@ -39,19 +38,17 @@ where St1: Stream, } impl<St1, St2> Stream for Chain<St1, St2> -where St1: Stream, - St2: Stream<Item=St1::Item>, +where + St1: Stream, + St2: Stream<Item = St1::Item>, { type Item = St1::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); if let Some(first) = this.first.as_mut().as_pin_mut() { if let Some(item) = ready!(first.poll_next(cx)) { - return Poll::Ready(Some(item)) + return Poll::Ready(Some(item)); } } this.first.set(None); @@ -67,7 +64,7 @@ where St1: Stream, let upper = match (first_upper, second_upper) { (Some(x), Some(y)) => x.checked_add(y), - _ => None + _ => None, }; (lower, upper) diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs index 45a3212..8457869 100644 --- a/src/stream/stream/chunks.rs +++ b/src/stream/stream/chunks.rs @@ -1,13 +1,13 @@ use crate::stream::Fuse; +use alloc::vec::Vec; +use core::mem; +use core::pin::Pin; use futures_core::ready; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::mem; -use core::pin::Pin; -use alloc::vec::Vec; pin_project! { /// Stream for the [`chunks`](super::StreamExt::chunks) method. @@ -21,7 +21,10 @@ pin_project! { } } -impl<St: Stream> Chunks<St> where St: Stream { +impl<St: Stream> Chunks<St> +where + St: Stream, +{ pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); @@ -43,10 +46,7 @@ impl<St: Stream> Chunks<St> where St: Stream { impl<St: Stream> Stream for Chunks<St> { type Item = Vec<St::Item>; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.as_mut().project(); loop { match ready!(this.stream.as_mut().poll_next(cx)) { @@ -56,7 +56,7 @@ impl<St: Stream> Stream for Chunks<St> { Some(item) => { this.items.push(item); if this.items.len() >= *this.cap { - return Poll::Ready(Some(self.take())) + return Poll::Ready(Some(self.take())); } } diff --git a/src/stream/stream/collect.rs b/src/stream/stream/collect.rs index 774b34b..b0e81b9 100644 --- a/src/stream/stream/collect.rs +++ b/src/stream/stream/collect.rs @@ -23,16 +23,14 @@ impl<St: Stream, C: Default> Collect<St, C> { } pub(super) fn new(stream: St) -> Self { - Self { - stream, - collection: Default::default(), - } + Self { stream, collection: Default::default() } } } impl<St, C> FusedFuture for Collect<St, C> -where St: FusedStream, - C: Default + Extend<St:: Item> +where + St: FusedStream, + C: Default + Extend<St::Item>, { fn is_terminated(&self) -> bool { self.stream.is_terminated() @@ -40,8 +38,9 @@ where St: FusedStream, } impl<St, C> Future for Collect<St, C> -where St: Stream, - C: Default + Extend<St:: Item> +where + St: Stream, + C: Default + Extend<St::Item>, { type Output = C; diff --git a/src/stream/stream/concat.rs b/src/stream/stream/concat.rs index ee1349f..7e058b2 100644 --- a/src/stream/stream/concat.rs +++ b/src/stream/stream/concat.rs @@ -1,7 +1,7 @@ use core::pin::Pin; -use futures_core::future::{Future, FusedFuture}; +use futures_core::future::{FusedFuture, Future}; use futures_core::ready; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -17,35 +17,28 @@ pin_project! { } impl<St> Concat<St> -where St: Stream, - St::Item: Extend<<St::Item as IntoIterator>::Item> + - IntoIterator + Default, +where + St: Stream, + St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, { pub(super) fn new(stream: St) -> Self { - Self { - stream, - accum: None, - } + Self { stream, accum: None } } } impl<St> Future for Concat<St> -where St: Stream, - St::Item: Extend<<St::Item as IntoIterator>::Item> + - IntoIterator + Default, +where + St: Stream, + St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, { type Output = St::Item; - fn poll( - self: Pin<&mut Self>, cx: &mut Context<'_> - ) -> Poll<Self::Output> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let mut this = self.project(); loop { match ready!(this.stream.as_mut().poll_next(cx)) { - None => { - return Poll::Ready(this.accum.take().unwrap_or_default()) - } + None => return Poll::Ready(this.accum.take().unwrap_or_default()), Some(e) => { if let Some(a) = this.accum { a.extend(e) @@ -59,9 +52,9 @@ where St: Stream, } impl<St> FusedFuture for Concat<St> -where St: FusedStream, - St::Item: Extend<<St::Item as IntoIterator>::Item> + - IntoIterator + Default, +where + St: FusedStream, + St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, { fn is_terminated(&self) -> bool { self.accum.is_none() && self.stream.is_terminated() diff --git a/src/stream/stream/cycle.rs b/src/stream/stream/cycle.rs index a5b7dc0..507431d 100644 --- a/src/stream/stream/cycle.rs +++ b/src/stream/stream/cycle.rs @@ -21,10 +21,7 @@ where St: Clone + Stream, { pub(super) fn new(stream: St) -> Self { - Self { - orig: stream.clone(), - stream, - } + Self { orig: stream.clone(), stream } } } diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs index 7d4c9cb..1cf9d49 100644 --- a/src/stream/stream/enumerate.rs +++ b/src/stream/stream/enumerate.rs @@ -19,10 +19,7 @@ pin_project! { impl<St: Stream> Enumerate<St> { pub(super) fn new(stream: St) -> Self { - Self { - stream, - count: 0, - } + Self { stream, count: 0 } } delegate_access_inner!(stream, St, ()); @@ -37,10 +34,7 @@ impl<St: Stream + FusedStream> FusedStream for Enumerate<St> { impl<St: Stream> Stream for Enumerate<St> { type Item = (usize, St::Item); - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let this = self.project(); match ready!(this.stream.poll_next(cx)) { diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index 57de025..ccf1a51 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -1,3 +1,4 @@ +use crate::fns::FnMut1; use core::fmt; use core::pin::Pin; use futures_core::future::Future; @@ -7,7 +8,6 @@ use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use crate::fns::FnMut1; pin_project! { /// Stream for the [`filter`](super::StreamExt::filter) method. @@ -41,26 +41,23 @@ where #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 impl<St, Fut, F> Filter<St, Fut, F> -where St: Stream, - F: for<'a> FnMut1<&'a St::Item, Output=Fut>, - Fut: Future<Output = bool>, +where + St: Stream, + F: for<'a> FnMut1<&'a St::Item, Output = Fut>, + Fut: Future<Output = bool>, { pub(super) fn new(stream: St, f: F) -> Self { - Self { - stream, - f, - pending_fut: None, - pending_item: None, - } + Self { stream, f, pending_fut: None, pending_item: None } } delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> FusedStream for Filter<St, Fut, F> - where St: Stream + FusedStream, - F: FnMut(&St::Item) -> Fut, - Fut: Future<Output = bool>, +where + St: Stream + FusedStream, + F: FnMut(&St::Item) -> Fut, + Fut: Future<Output = bool>, { fn is_terminated(&self) -> bool { self.pending_fut.is_none() && self.stream.is_terminated() @@ -69,16 +66,14 @@ impl<St, Fut, F> FusedStream for Filter<St, Fut, F> #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 impl<St, Fut, F> Stream for Filter<St, Fut, F> - where St: Stream, - F: for<'a> FnMut1<&'a St::Item, Output=Fut>, - Fut: Future<Output = bool>, +where + St: Stream, + F: for<'a> FnMut1<&'a St::Item, Output = Fut>, + Fut: Future<Output = bool>, { type Item = St::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<St::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { let mut this = self.project(); Poll::Ready(loop { if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { @@ -111,9 +106,10 @@ impl<St, Fut, F> Stream for Filter<St, Fut, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Fut, F, Item> Sink<Item> for Filter<S, Fut, F> - where S: Stream + Sink<Item>, - F: FnMut(&S::Item) -> Fut, - Fut: Future<Output = bool>, +where + S: Stream + Sink<Item>, + F: FnMut(&S::Item) -> Fut, + Fut: Future<Output = bool>, { type Error = S::Error; diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index b762fac..02a0a43 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -1,3 +1,4 @@ +use crate::fns::FnMut1; use core::fmt; use core::pin::Pin; use futures_core::future::Future; @@ -7,7 +8,6 @@ use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use crate::fns::FnMut1; pin_project! { /// Stream for the [`filter_map`](super::StreamExt::filter_map) method. @@ -35,9 +35,10 @@ where } impl<St, Fut, F> FilterMap<St, Fut, F> - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { pub(super) fn new(stream: St, f: F) -> Self { Self { stream, f, pending: None } @@ -47,9 +48,10 @@ impl<St, Fut, F> FilterMap<St, Fut, F> } impl<St, Fut, F, T> FusedStream for FilterMap<St, Fut, F> - where St: Stream + FusedStream, - F: FnMut1<St::Item, Output=Fut>, - Fut: Future<Output = Option<T>>, +where + St: Stream + FusedStream, + F: FnMut1<St::Item, Output = Fut>, + Fut: Future<Output = Option<T>>, { fn is_terminated(&self) -> bool { self.pending.is_none() && self.stream.is_terminated() @@ -57,16 +59,14 @@ impl<St, Fut, F, T> FusedStream for FilterMap<St, Fut, F> } impl<St, Fut, F, T> Stream for FilterMap<St, Fut, F> - where St: Stream, - F: FnMut1<St::Item, Output=Fut>, - Fut: Future<Output = Option<T>>, +where + St: Stream, + F: FnMut1<St::Item, Output = Fut>, + Fut: Future<Output = Option<T>>, { type Item = T; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<T>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { let mut this = self.project(); Poll::Ready(loop { if let Some(p) = this.pending.as_mut().as_pin_mut() { @@ -100,9 +100,10 @@ impl<St, Fut, F, T> Stream for FilterMap<St, Fut, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Fut, F, Item> Sink<Item> for FilterMap<S, Fut, F> - where S: Stream + Sink<Item>, - F: FnMut1<S::Item, Output=Fut>, - Fut: Future, +where + S: Stream + Sink<Item>, + F: FnMut1<S::Item, Output = Fut>, + Fut: Future, { type Error = S::Error; diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index e109c3b..b8b55ec 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -35,24 +35,21 @@ where } impl<St, Fut, T, F> Fold<St, Fut, T, F> -where St: Stream, - F: FnMut(T, St::Item) -> Fut, - Fut: Future<Output = T>, +where + St: Stream, + F: FnMut(T, St::Item) -> Fut, + Fut: Future<Output = T>, { pub(super) fn new(stream: St, f: F, t: T) -> Self { - Self { - stream, - f, - accum: Some(t), - future: None, - } + Self { stream, f, accum: Some(t), future: None } } } impl<St, Fut, T, F> FusedFuture for Fold<St, Fut, T, F> - where St: Stream, - F: FnMut(T, St::Item) -> Fut, - Fut: Future<Output = T>, +where + St: Stream, + F: FnMut(T, St::Item) -> Fut, + Fut: Future<Output = T>, { fn is_terminated(&self) -> bool { self.accum.is_none() && self.future.is_none() @@ -60,9 +57,10 @@ impl<St, Fut, T, F> FusedFuture for Fold<St, Fut, T, F> } impl<St, Fut, T, F> Future for Fold<St, Fut, T, F> - where St: Stream, - F: FnMut(T, St::Item) -> Fut, - Fut: Future<Output = T>, +where + St: Stream, + F: FnMut(T, St::Item) -> Fut, + Fut: Future<Output = T>, { type Output = T; diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs index ee90e66..5302b0e 100644 --- a/src/stream/stream/for_each.rs +++ b/src/stream/stream/for_each.rs @@ -32,23 +32,21 @@ where } impl<St, Fut, F> ForEach<St, Fut, F> -where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future<Output = ()>, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = ()>, { pub(super) fn new(stream: St, f: F) -> Self { - Self { - stream, - f, - future: None, - } + Self { stream, f, future: None } } } impl<St, Fut, F> FusedFuture for ForEach<St, Fut, F> - where St: FusedStream, - F: FnMut(St::Item) -> Fut, - Fut: Future<Output = ()>, +where + St: FusedStream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = ()>, { fn is_terminated(&self) -> bool { self.future.is_none() && self.stream.is_terminated() @@ -56,9 +54,10 @@ impl<St, Fut, F> FusedFuture for ForEach<St, Fut, F> } impl<St, Fut, F> Future for ForEach<St, Fut, F> - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future<Output = ()>, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = ()>, { type Output = (); diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs index cee0ba1..6c18753 100644 --- a/src/stream/stream/for_each_concurrent.rs +++ b/src/stream/stream/for_each_concurrent.rs @@ -1,7 +1,7 @@ use crate::stream::{FuturesUnordered, StreamExt}; use core::fmt; -use core::pin::Pin; use core::num::NonZeroUsize; +use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; @@ -35,9 +35,10 @@ where } impl<St, Fut, F> ForEachConcurrent<St, Fut, F> -where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future<Output = ()>, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = ()>, { pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self { Self { @@ -51,9 +52,10 @@ where St: Stream, } impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F> - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future<Output = ()>, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = ()>, { fn is_terminated(&self) -> bool { self.stream.is_none() && self.futures.is_empty() @@ -61,9 +63,10 @@ impl<St, Fut, F> FusedFuture for ForEachConcurrent<St, Fut, F> } impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future<Output = ()>, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future<Output = ()>, { type Output = (); @@ -80,7 +83,7 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> Poll::Ready(Some(elem)) => { made_progress_this_iter = true; Some(elem) - }, + } Poll::Ready(None) => { stream_completed = true; None @@ -102,9 +105,9 @@ impl<St, Fut, F> Future for ForEachConcurrent<St, Fut, F> Poll::Ready(Some(())) => made_progress_this_iter = true, Poll::Ready(None) => { if this.stream.is_none() { - return Poll::Ready(()) + return Poll::Ready(()); } - }, + } Poll::Pending => {} } diff --git a/src/stream/stream/forward.rs b/src/stream/stream/forward.rs index 2247b21..1fe2427 100644 --- a/src/stream/stream/forward.rs +++ b/src/stream/stream/forward.rs @@ -23,11 +23,7 @@ pin_project! { impl<St, Si, Item> Forward<St, Si, Item> { pub(crate) fn new(stream: St, sink: Si) -> Self { - Self { - sink: Some(sink), - stream: Fuse::new(stream), - buffered_item: None, - } + Self { sink: Some(sink), stream: Fuse::new(stream), buffered_item: None } } } @@ -48,10 +44,7 @@ where { type Output = Result<(), E>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let ForwardProj { mut sink, mut stream, buffered_item } = self.project(); let mut si = sink.as_mut().as_pin_mut().expect("polled `Forward` after completion"); @@ -70,11 +63,11 @@ where Poll::Ready(None) => { ready!(si.poll_close(cx))?; sink.set(None); - return Poll::Ready(Ok(())) + return Poll::Ready(Ok(())); } Poll::Pending => { ready!(si.poll_flush(cx))?; - return Poll::Pending + return Poll::Pending; } } } diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index e1d8c12..fe67813 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -43,10 +43,7 @@ impl<S: Stream> FusedStream for Fuse<S> { impl<S: Stream> Stream for Fuse<S> { type Item = S::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<S::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { let this = self.project(); if *this.done { diff --git a/src/stream/stream/into_future.rs b/src/stream/stream/into_future.rs index a9a1e23..8abfddc 100644 --- a/src/stream/stream/into_future.rs +++ b/src/stream/stream/into_future.rs @@ -79,10 +79,7 @@ impl<St: Stream + Unpin> FusedFuture for StreamFuture<St> { impl<St: Stream + Unpin> Future for StreamFuture<St> { type Output = (Option<St::Item>, St); - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let item = { let s = self.stream.as_mut().expect("polling StreamFuture twice"); ready!(s.poll_next_unpin(cx)) diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs index 1a269f0..88bb612 100644 --- a/src/stream/stream/map.rs +++ b/src/stream/stream/map.rs @@ -24,9 +24,7 @@ where St: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Map") - .field("stream", &self.stream) - .finish() + f.debug_struct("Map").field("stream", &self.stream).finish() } } @@ -39,8 +37,9 @@ impl<St, F> Map<St, F> { } impl<St, F> FusedStream for Map<St, F> - where St: FusedStream, - F: FnMut1<St::Item>, +where + St: FusedStream, + F: FnMut1<St::Item>, { fn is_terminated(&self) -> bool { self.stream.is_terminated() @@ -48,15 +47,13 @@ impl<St, F> FusedStream for Map<St, F> } impl<St, F> Stream for Map<St, F> - where St: Stream, - F: FnMut1<St::Item>, +where + St: Stream, + F: FnMut1<St::Item>, { type Item = F::Output; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); let res = ready!(this.stream.as_mut().poll_next(cx)); Poll::Ready(res.map(|x| this.f.call_mut(x))) @@ -70,8 +67,9 @@ impl<St, F> Stream for Map<St, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<St, F, Item> Sink<Item> for Map<St, F> - where St: Stream + Sink<Item>, - F: FnMut1<St::Item>, +where + St: Stream + Sink<Item>, + F: FnMut1<St::Item>, { type Error = St::Error; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index c3340ec..9089e6e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -123,7 +123,7 @@ pub use self::select_next_some::SelectNextSome; mod peek; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::peek::{Peek, Peekable}; +pub use self::peek::{NextIf, NextIfEq, Peek, Peekable}; mod skip; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -919,7 +919,7 @@ pub trait StreamExt: Stream { /// fut.await; /// # }) /// ``` - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn for_each_concurrent<Fut, F>( self, @@ -1142,7 +1142,7 @@ pub trait StreamExt: Stream { /// /// This method is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn buffered(self, n: usize) -> Buffered<Self> where @@ -1187,7 +1187,7 @@ pub trait StreamExt: Stream { /// assert_eq!(buffered.next().await, None); /// # Ok::<(), i32>(()) }).unwrap(); /// ``` - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn buffer_unordered(self, n: usize) -> BufferUnordered<Self> where @@ -1329,7 +1329,8 @@ pub trait StreamExt: Stream { /// the sink is closed. Note that neither the original stream nor provided /// sink will be output by this future. Pass the sink by `Pin<&mut S>` /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in - /// order to preserve access to the `Sink`. + /// order to preserve access to the `Sink`. If the stream produces an error, + /// that error will be returned by this future without flushing/closing the sink. #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] fn forward<S>(self, sink: S) -> Forward<Self, S> @@ -1354,7 +1355,7 @@ pub trait StreamExt: Stream { /// library is activated, and it is activated by default. #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where diff --git a/src/stream/stream/next.rs b/src/stream/stream/next.rs index 6949878..8d8347a 100644 --- a/src/stream/stream/next.rs +++ b/src/stream/stream/next.rs @@ -28,10 +28,7 @@ impl<St: ?Sized + FusedStream + Unpin> FusedFuture for Next<'_, St> { impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { type Output = Option<St::Item>; - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.stream.poll_next_unpin(cx) } } diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs index a403110..217faba 100644 --- a/src/stream/stream/peek.rs +++ b/src/stream/stream/peek.rs @@ -1,5 +1,7 @@ +use crate::fns::FnOnce1; use crate::stream::{Fuse, StreamExt}; use core::fmt; +use core::marker::PhantomData; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::ready; @@ -26,10 +28,7 @@ pin_project! { impl<St: Stream> Peekable<St> { pub(super) fn new(stream: St) -> Self { - Self { - stream: stream.fuse(), - peeked: None, - } + Self { stream: stream.fuse(), peeked: None } } delegate_access_inner!(stream, St, (.)); @@ -44,10 +43,7 @@ impl<St: Stream> Peekable<St> { /// /// This method polls the underlying stream and return either a reference /// to the next item if the stream is ready or passes through any errors. - pub fn poll_peek( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<&St::Item>> { + pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>> { let mut this = self.project(); Poll::Ready(loop { @@ -60,6 +56,86 @@ impl<St: Stream> Peekable<St> { } }) } + + /// Creates a future which will consume and return the next value of this + /// stream if a condition is true. + /// + /// If `func` returns `true` for the next value of this stream, consume and + /// return it. Otherwise, return `None`. + /// + /// # Examples + /// + /// Consume a number if it's equal to 0. + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// use futures::pin_mut; + /// + /// let stream = stream::iter(0..5).peekable(); + /// pin_mut!(stream); + /// // The first item of the stream is 0; consume it. + /// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, Some(0)); + /// // The next item returned is now 1, so `consume` will return `false`. + /// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, None); + /// // `next_if` saves the value of the next item if it was not equal to `expected`. + /// assert_eq!(stream.next().await, Some(1)); + /// # }); + /// ``` + /// + /// Consume any number less than 10. + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// use futures::pin_mut; + /// + /// let stream = stream::iter(1..20).peekable(); + /// pin_mut!(stream); + /// // Consume all numbers less than 10 + /// while stream.as_mut().next_if(|&x| x < 10).await.is_some() {} + /// // The next value returned will be 10 + /// assert_eq!(stream.next().await, Some(10)); + /// # }); + /// ``` + pub fn next_if<F>(self: Pin<&mut Self>, func: F) -> NextIf<'_, St, F> + where + F: FnOnce(&St::Item) -> bool, + { + NextIf { inner: Some((self, func)) } + } + + /// Creates a future which will consume and return the next item if it is + /// equal to `expected`. + /// + /// # Example + /// + /// Consume a number if it's equal to 0. + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// use futures::pin_mut; + /// + /// let stream = stream::iter(0..5).peekable(); + /// pin_mut!(stream); + /// // The first item of the stream is 0; consume it. + /// assert_eq!(stream.as_mut().next_if_eq(&0).await, Some(0)); + /// // The next item returned is now 1, so `consume` will return `false`. + /// assert_eq!(stream.as_mut().next_if_eq(&0).await, None); + /// // `next_if_eq` saves the value of the next item if it was not equal to `expected`. + /// assert_eq!(stream.next().await, Some(1)); + /// # }); + /// ``` + pub fn next_if_eq<'a, T>(self: Pin<&'a mut Self>, expected: &'a T) -> NextIfEq<'a, St, T> + where + T: ?Sized, + St::Item: PartialEq<T>, + { + NextIfEq { + inner: NextIf { inner: Some((self, NextIfEqFn { expected, _next: PhantomData })) }, + } + } } impl<St: Stream> FusedStream for Peekable<St> { @@ -103,7 +179,7 @@ where } pin_project! { - /// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`] + /// Future for the [`Peekable::peek`](self::Peekable::peek) method. #[must_use = "futures do nothing unless polled"] pub struct Peek<'a, St: Stream> { inner: Option<Pin<&'a mut Peekable<St>>>, @@ -116,9 +192,7 @@ where St::Item: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Peek") - .field("inner", &self.inner) - .finish() + f.debug_struct("Peek").field("inner", &self.inner).finish() } } @@ -133,6 +207,7 @@ where St: Stream, { type Output = Option<&'a St::Item>; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let inner = self.project().inner; if let Some(peekable) = inner { @@ -144,3 +219,125 @@ where } } } + +pin_project! { + /// Future for the [`Peekable::next_if`](self::Peekable::next_if) method. + #[must_use = "futures do nothing unless polled"] + pub struct NextIf<'a, St: Stream, F> { + inner: Option<(Pin<&'a mut Peekable<St>>, F)>, + } +} + +impl<St, F> fmt::Debug for NextIf<'_, St, F> +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NextIf").field("inner", &self.inner.as_ref().map(|(s, _f)| s)).finish() + } +} + +#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 +impl<St, F> FusedFuture for NextIf<'_, St, F> +where + St: Stream, + F: for<'a> FnOnce1<&'a St::Item, Output = bool>, +{ + fn is_terminated(&self) -> bool { + self.inner.is_none() + } +} + +#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 +impl<St, F> Future for NextIf<'_, St, F> +where + St: Stream, + F: for<'a> FnOnce1<&'a St::Item, Output = bool>, +{ + type Output = Option<St::Item>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner = self.project().inner; + if let Some((peekable, _)) = inner { + let res = ready!(peekable.as_mut().poll_next(cx)); + + let (peekable, func) = inner.take().unwrap(); + match res { + Some(ref matched) if func.call_once(matched) => Poll::Ready(res), + other => { + let peekable = peekable.project(); + // Since we called `self.next()`, we consumed `self.peeked`. + assert!(peekable.peeked.is_none()); + *peekable.peeked = other; + Poll::Ready(None) + } + } + } else { + panic!("NextIf polled after completion") + } + } +} + +pin_project! { + /// Future for the [`Peekable::next_if_eq`](self::Peekable::next_if_eq) method. + #[must_use = "futures do nothing unless polled"] + pub struct NextIfEq<'a, St: Stream, T: ?Sized> { + #[pin] + inner: NextIf<'a, St, NextIfEqFn<'a, T, St::Item>>, + } +} + +impl<St, T> fmt::Debug for NextIfEq<'_, St, T> +where + St: Stream + fmt::Debug, + St::Item: fmt::Debug, + T: ?Sized, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NextIfEq") + .field("inner", &self.inner.inner.as_ref().map(|(s, _f)| s)) + .finish() + } +} + +impl<St, T> FusedFuture for NextIfEq<'_, St, T> +where + St: Stream, + T: ?Sized, + St::Item: PartialEq<T>, +{ + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + +impl<St, T> Future for NextIfEq<'_, St, T> +where + St: Stream, + T: ?Sized, + St::Item: PartialEq<T>, +{ + type Output = Option<St::Item>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.project().inner.poll(cx) + } +} + +struct NextIfEqFn<'a, T: ?Sized, Item> { + expected: &'a T, + _next: PhantomData<Item>, +} + +impl<T, Item> FnOnce1<&Item> for NextIfEqFn<'_, T, Item> +where + T: ?Sized, + Item: PartialEq<T>, +{ + type Output = bool; + + fn call_once(self, next: &Item) -> Self::Output { + next == self.expected + } +} diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs index b6e3e5c..5ebc958 100644 --- a/src/stream/stream/ready_chunks.rs +++ b/src/stream/stream/ready_chunks.rs @@ -1,12 +1,12 @@ use crate::stream::Fuse; -use futures_core::stream::{Stream, FusedStream}; +use alloc::vec::Vec; +use core::mem; +use core::pin::Pin; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::mem; -use core::pin::Pin; -use alloc::vec::Vec; pin_project! { /// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method. @@ -20,7 +20,10 @@ pin_project! { } } -impl<St: Stream> ReadyChunks<St> where St: Stream { +impl<St: Stream> ReadyChunks<St> +where + St: Stream, +{ pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); @@ -37,10 +40,7 @@ impl<St: Stream> ReadyChunks<St> where St: Stream { impl<St: Stream> Stream for ReadyChunks<St> { type Item = Vec<St::Item>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); loop { @@ -61,7 +61,10 @@ impl<St: Stream> Stream for ReadyChunks<St> { Poll::Ready(Some(item)) => { this.items.push(item); if this.items.len() >= *this.cap { - return Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap)))) + return Poll::Ready(Some(mem::replace( + this.items, + Vec::with_capacity(*this.cap), + ))); } } diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index 2097280..8724145 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -56,14 +56,7 @@ where Fut: Future<Output = Option<B>>, { pub(super) fn new(stream: St, initial_state: S, f: F) -> Self { - Self { - stream, - state_f: Some(StateFn { - state: initial_state, - f, - }), - future: None, - } + Self { stream, state_f: Some(StateFn { state: initial_state, f }), future: None } } delegate_access_inner!(stream, St, ()); diff --git a/src/stream/stream/select_next_some.rs b/src/stream/stream/select_next_some.rs index fe7a089..3115e14 100644 --- a/src/stream/stream/select_next_some.rs +++ b/src/stream/stream/select_next_some.rs @@ -1,9 +1,9 @@ +use crate::stream::StreamExt; use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; use futures_core::ready; use futures_core::stream::FusedStream; -use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; -use crate::stream::StreamExt; /// Future for the [`select_next_some`](super::StreamExt::select_next_some) /// method. diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs index 6ffcf57..f495779 100644 --- a/src/stream/stream/skip.rs +++ b/src/stream/stream/skip.rs @@ -19,10 +19,7 @@ pin_project! { impl<St: Stream> Skip<St> { pub(super) fn new(stream: St, n: usize) -> Self { - Self { - stream, - remaining: n, - } + Self { stream, remaining: n } } delegate_access_inner!(stream, St, ()); @@ -37,10 +34,7 @@ impl<St: FusedStream> FusedStream for Skip<St> { impl<St: Stream> Stream for Skip<St> { type Item = St::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<St::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { let mut this = self.project(); while *this.remaining > 0 { @@ -57,11 +51,8 @@ impl<St: Stream> Stream for Skip<St> { fn size_hint(&self) -> (usize, Option<usize>) { let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_sub(self.remaining as usize); - let upper = match upper { - Some(x) => Some(x.saturating_sub(self.remaining as usize)), - None => None, - }; + let lower = lower.saturating_sub(self.remaining); + let upper = upper.map(|x| x.saturating_sub(self.remaining)); (lower, upper) } diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs index e1aa3f9..50a21a2 100644 --- a/src/stream/stream/skip_while.rs +++ b/src/stream/stream/skip_while.rs @@ -39,27 +39,23 @@ where } impl<St, Fut, F> SkipWhile<St, Fut, F> - where St: Stream, - F: FnMut(&St::Item) -> Fut, - Fut: Future<Output = bool>, +where + St: Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future<Output = bool>, { pub(super) fn new(stream: St, f: F) -> Self { - Self { - stream, - f, - pending_fut: None, - pending_item: None, - done_skipping: false, - } + Self { stream, f, pending_fut: None, pending_item: None, done_skipping: false } } delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F> - where St: FusedStream, - F: FnMut(&St::Item) -> Fut, - Fut: Future<Output = bool>, +where + St: FusedStream, + F: FnMut(&St::Item) -> Fut, + Fut: Future<Output = bool>, { fn is_terminated(&self) -> bool { self.pending_item.is_none() && self.stream.is_terminated() @@ -67,16 +63,14 @@ impl<St, Fut, F> FusedStream for SkipWhile<St, Fut, F> } impl<St, Fut, F> Stream for SkipWhile<St, Fut, F> - where St: Stream, - F: FnMut(&St::Item) -> Fut, - Fut: Future<Output = bool>, +where + St: Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future<Output = bool>, { type Item = St::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<St::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { let mut this = self.project(); if *this.done_skipping { @@ -119,9 +113,10 @@ impl<St, Fut, F> Stream for SkipWhile<St, Fut, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Fut, F, Item> Sink<Item> for SkipWhile<S, Fut, F> - where S: Stream + Sink<Item>, - F: FnMut(&S::Item) -> Fut, - Fut: Future<Output = bool>, +where + S: Stream + Sink<Item>, + F: FnMut(&S::Item) -> Fut, + Fut: Future<Output = bool>, { type Error = S::Error; diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs index 997b974..3a72fee 100644 --- a/src/stream/stream/split.rs +++ b/src/stream/stream/split.rs @@ -1,9 +1,9 @@ +use core::fmt; +use core::pin::Pin; use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use core::fmt; -use core::pin::Pin; use crate::lock::BiLock; @@ -20,7 +20,8 @@ impl<S: Unpin> SplitStream<S> { /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are /// a matching pair originating from the same call to `StreamExt::split`. pub fn reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>> - where S: Sink<Item>, + where + S: Sink<Item>, { other.reunite(self) } @@ -36,10 +37,7 @@ impl<S: Stream> Stream for SplitStream<S> { #[allow(bad_style)] fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> { - SplitSink { - lock, - slot: None, - } + SplitSink { lock, slot: None } } /// A `Sink` part of the split pair @@ -58,14 +56,16 @@ impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> { /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are /// a matching pair originating from the same call to `StreamExt::split`. pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>> { - self.lock.reunite(other.0).map_err(|err| { - ReuniteError(SplitSink(err.0), SplitStream(err.1)) - }) + self.lock.reunite(other.0).map_err(|err| ReuniteError(SplitSink(err.0), SplitStream(err.1))) } } impl<S: Sink<Item>, Item> SplitSink<S, Item> { - fn poll_flush_slot(mut inner: Pin<&mut S>, slot: &mut Option<Item>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> { + fn poll_flush_slot( + mut inner: Pin<&mut S>, + slot: &mut Option<Item>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), S::Error>> { if slot.is_some() { ready!(inner.as_mut().poll_ready(cx))?; Poll::Ready(inner.start_send(slot.take().unwrap())) @@ -74,7 +74,10 @@ impl<S: Sink<Item>, Item> SplitSink<S, Item> { } } - fn poll_lock_and_flush_slot(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> { + fn poll_lock_and_flush_slot( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), S::Error>> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx) @@ -127,9 +130,7 @@ pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>); impl<T, Item> fmt::Debug for ReuniteError<T, Item> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ReuniteError") - .field(&"...") - .finish() + f.debug_tuple("ReuniteError").field(&"...").finish() } } diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index 124d397..b1c728e 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -1,7 +1,7 @@ use core::cmp; use core::pin::Pin; use futures_core::ready; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -20,24 +20,19 @@ pin_project! { impl<St: Stream> Take<St> { pub(super) fn new(stream: St, n: usize) -> Self { - Self { - stream, - remaining: n, - } + Self { stream, remaining: n } } delegate_access_inner!(stream, St, ()); } impl<St> Stream for Take<St> - where St: Stream, +where + St: Stream, { type Item = St::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<St::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { if self.remaining == 0 { Poll::Ready(None) } else { @@ -63,7 +58,7 @@ impl<St> Stream for Take<St> let upper = match upper { Some(x) if x < self.remaining as usize => Some(x), - _ => Some(self.remaining as usize) + _ => Some(self.remaining as usize), }; (lower, upper) @@ -71,7 +66,8 @@ impl<St> Stream for Take<St> } impl<St> FusedStream for Take<St> - where St: FusedStream, +where + St: FusedStream, { fn is_terminated(&self) -> bool { self.remaining == 0 || self.stream.is_terminated() @@ -81,7 +77,8 @@ impl<St> FusedStream for Take<St> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Item> Sink<Item> for Take<S> - where S: Stream + Sink<Item>, +where + S: Stream + Sink<Item>, { type Error = S::Error; diff --git a/src/stream/stream/take_until.rs b/src/stream/stream/take_until.rs index 4dea01a..d14f9ce 100644 --- a/src/stream/stream/take_until.rs +++ b/src/stream/stream/take_until.rs @@ -34,10 +34,7 @@ where Fut: Future + fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TakeUntil") - .field("stream", &self.stream) - .field("fut", &self.fut) - .finish() + f.debug_struct("TakeUntil").field("stream", &self.stream).field("fut", &self.fut).finish() } } @@ -47,12 +44,7 @@ where Fut: Future, { pub(super) fn new(stream: St, fut: Fut) -> Self { - Self { - stream, - fut: Some(fut), - fut_result: None, - free: false, - } + Self { stream, fut: Some(fut), fut_result: None, free: false } } delegate_access_inner!(stream, St, ()); diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index 4cdba83..01b2765 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -2,7 +2,7 @@ use core::fmt; use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; -use futures_core::stream::{Stream, FusedStream}; +use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -39,34 +39,27 @@ where } impl<St, Fut, F> TakeWhile<St, Fut, F> - where St: Stream, - F: FnMut(&St::Item) -> Fut, - Fut: Future<Output = bool>, +where + St: Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future<Output = bool>, { pub(super) fn new(stream: St, f: F) -> Self { - Self { - stream, - f, - pending_fut: None, - pending_item: None, - done_taking: false, - } + Self { stream, f, pending_fut: None, pending_item: None, done_taking: false } } delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> Stream for TakeWhile<St, Fut, F> - where St: Stream, - F: FnMut(&St::Item) -> Fut, - Fut: Future<Output = bool>, +where + St: Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future<Output = bool>, { type Item = St::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<St::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { if self.done_taking { return Poll::Ready(None); } @@ -109,9 +102,10 @@ impl<St, Fut, F> Stream for TakeWhile<St, Fut, F> } impl<St, Fut, F> FusedStream for TakeWhile<St, Fut, F> - where St: FusedStream, - F: FnMut(&St::Item) -> Fut, - Fut: Future<Output = bool>, +where + St: FusedStream, + F: FnMut(&St::Item) -> Fut, + Fut: Future<Output = bool>, { fn is_terminated(&self) -> bool { self.done_taking || self.pending_item.is_none() && self.stream.is_terminated() @@ -121,7 +115,8 @@ impl<St, Fut, F> FusedStream for TakeWhile<St, Fut, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Fut, F, Item> Sink<Item> for TakeWhile<S, Fut, F> - where S: Stream + Sink<Item>, +where + S: Stream + Sink<Item>, { type Error = S::Error; diff --git a/src/stream/stream/then.rs b/src/stream/stream/then.rs index 3d42bdd..d4531d4 100644 --- a/src/stream/stream/then.rs +++ b/src/stream/stream/then.rs @@ -26,32 +26,27 @@ where Fut: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Then") - .field("stream", &self.stream) - .field("future", &self.future) - .finish() + f.debug_struct("Then").field("stream", &self.stream).field("future", &self.future).finish() } } impl<St, Fut, F> Then<St, Fut, F> - where St: Stream, - F: FnMut(St::Item) -> Fut, +where + St: Stream, + F: FnMut(St::Item) -> Fut, { pub(super) fn new(stream: St, f: F) -> Self { - Self { - stream, - future: None, - f, - } + Self { stream, future: None, f } } delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> FusedStream for Then<St, Fut, F> - where St: FusedStream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: FusedStream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { fn is_terminated(&self) -> bool { self.future.is_none() && self.stream.is_terminated() @@ -59,16 +54,14 @@ impl<St, Fut, F> FusedStream for Then<St, Fut, F> } impl<St, Fut, F> Stream for Then<St, Fut, F> - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { type Item = Fut::Output; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); Poll::Ready(loop { @@ -99,7 +92,8 @@ impl<St, Fut, F> Stream for Then<St, Fut, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Fut, F, Item> Sink<Item> for Then<S, Fut, F> - where S: Sink<Item>, +where + S: Sink<Item>, { type Error = S::Error; diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs index 5024770..15f22e8 100644 --- a/src/stream/stream/unzip.rs +++ b/src/stream/stream/unzip.rs @@ -21,25 +21,19 @@ pin_project! { impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> { fn finish(self: Pin<&mut Self>) -> (FromA, FromB) { let this = self.project(); - ( - mem::replace(this.left, Default::default()), - mem::replace(this.right, Default::default()), - ) + (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default())) } pub(super) fn new(stream: St) -> Self { - Self { - stream, - left: Default::default(), - right: Default::default(), - } + Self { stream, left: Default::default(), right: Default::default() } } } impl<St, A, B, FromA, FromB> FusedFuture for Unzip<St, FromA, FromB> -where St: FusedStream<Item = (A, B)>, - FromA: Default + Extend<A>, - FromB: Default + Extend<B>, +where + St: FusedStream<Item = (A, B)>, + FromA: Default + Extend<A>, + FromB: Default + Extend<B>, { fn is_terminated(&self) -> bool { self.stream.is_terminated() @@ -47,9 +41,10 @@ where St: FusedStream<Item = (A, B)>, } impl<St, A, B, FromA, FromB> Future for Unzip<St, FromA, FromB> -where St: Stream<Item = (A, B)>, - FromA: Default + Extend<A>, - FromB: Default + Extend<B>, +where + St: Stream<Item = (A, B)>, + FromA: Default + Extend<A>, + FromB: Default + Extend<B>, { type Output = (FromA, FromB); @@ -60,7 +55,7 @@ where St: Stream<Item = (A, B)>, Some(e) => { this.left.extend(Some(e.0)); this.right.extend(Some(e.1)); - }, + } None => return Poll::Ready(self.finish()), } } diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index 588531a..360a8b6 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -1,4 +1,4 @@ -use crate::stream::{StreamExt, Fuse}; +use crate::stream::{Fuse, StreamExt}; use core::cmp; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; @@ -21,12 +21,7 @@ pin_project! { impl<St1: Stream, St2: Stream> Zip<St1, St2> { pub(super) fn new(stream1: St1, stream2: St2) -> Self { - Self { - stream1: stream1.fuse(), - stream2: stream2.fuse(), - queued1: None, - queued2: None, - } + Self { stream1: stream1.fuse(), stream2: stream2.fuse(), queued1: None, queued2: None } } /// Acquires a reference to the underlying streams that this combinator is @@ -64,7 +59,9 @@ impl<St1: Stream, St2: Stream> Zip<St1, St2> { } impl<St1, St2> FusedStream for Zip<St1, St2> - where St1: Stream, St2: Stream, +where + St1: Stream, + St2: Stream, { fn is_terminated(&self) -> bool { self.stream1.is_terminated() && self.stream2.is_terminated() @@ -72,14 +69,13 @@ impl<St1, St2> FusedStream for Zip<St1, St2> } impl<St1, St2> Stream for Zip<St1, St2> - where St1: Stream, St2: Stream +where + St1: Stream, + St2: Stream, { type Item = (St1::Item, St2::Item); - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); if this.queued1.is_none() { @@ -124,7 +120,7 @@ impl<St1, St2> Stream for Zip<St1, St2> } (Some(x), None) => x.checked_add(queued1_len), (None, Some(y)) => y.checked_add(queued2_len), - (None, None) => None + (None, None) => None, }; (lower, upper) diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs index b185646..a7b50db 100644 --- a/src/stream/try_stream/and_then.rs +++ b/src/stream/try_stream/and_then.rs @@ -2,7 +2,7 @@ use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::ready; -use futures_core::stream::{Stream, TryStream, FusedStream}; +use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -34,9 +34,10 @@ where } impl<St, Fut, F> AndThen<St, Fut, F> - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: TryFuture<Error = St::Error>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: TryFuture<Error = St::Error>, { pub(super) fn new(stream: St, f: F) -> Self { Self { stream, future: None, f } @@ -46,16 +47,14 @@ impl<St, Fut, F> AndThen<St, Fut, F> } impl<St, Fut, F> Stream for AndThen<St, Fut, F> - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: TryFuture<Error = St::Error>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: TryFuture<Error = St::Error>, { type Item = Result<Fut::Ok, St::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); Poll::Ready(loop { @@ -84,9 +83,10 @@ impl<St, Fut, F> Stream for AndThen<St, Fut, F> } impl<St, Fut, F> FusedStream for AndThen<St, Fut, F> - where St: TryStream + FusedStream, - F: FnMut(St::Ok) -> Fut, - Fut: TryFuture<Error = St::Error>, +where + St: TryStream + FusedStream, + F: FnMut(St::Ok) -> Fut, + Fut: TryFuture<Error = St::Error>, { fn is_terminated(&self) -> bool { self.future.is_none() && self.stream.is_terminated() @@ -96,7 +96,8 @@ impl<St, Fut, F> FusedStream for AndThen<St, Fut, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Fut, F, Item> Sink<Item> for AndThen<S, Fut, F> - where S: Sink<Item>, +where + S: Sink<Item>, { type Error = S::Error; diff --git a/src/stream/try_stream/into_async_read.rs b/src/stream/try_stream/into_async_read.rs index 197c105..914b277 100644 --- a/src/stream/try_stream/into_async_read.rs +++ b/src/stream/try_stream/into_async_read.rs @@ -3,7 +3,7 @@ use core::pin::Pin; use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use futures_io::{AsyncRead, AsyncWrite, AsyncBufRead}; +use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; use std::cmp; use std::io::{Error, Result}; @@ -40,10 +40,7 @@ where St::Ok: AsRef<[u8]>, { pub(super) fn new(stream: St) -> Self { - Self { - stream, - state: ReadState::PendingChunk, - } + Self { stream, state: ReadState::PendingChunk } } } @@ -63,9 +60,7 @@ where let chunk = chunk.as_ref(); let len = cmp::min(buf.len(), chunk.len() - *chunk_start); - buf[..len].copy_from_slice( - &chunk[*chunk_start..*chunk_start + len], - ); + buf[..len].copy_from_slice(&chunk[*chunk_start..*chunk_start + len]); *chunk_start += len; if chunk.len() == *chunk_start { @@ -74,26 +69,21 @@ where return Poll::Ready(Ok(len)); } - ReadState::PendingChunk => { - match ready!(self.stream.try_poll_next_unpin(cx)) { - Some(Ok(chunk)) => { - if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { - chunk, - chunk_start: 0, - }; - } - } - Some(Err(err)) => { - self.state = ReadState::Eof; - return Poll::Ready(Err(err)); - } - None => { - self.state = ReadState::Eof; - return Poll::Ready(Ok(0)); + ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) { + Some(Ok(chunk)) => { + if !chunk.as_ref().is_empty() { + self.state = ReadState::Ready { chunk, chunk_start: 0 }; } } - } + Some(Err(err)) => { + self.state = ReadState::Eof; + return Poll::Ready(Err(err)); + } + None => { + self.state = ReadState::Eof; + return Poll::Ready(Ok(0)); + } + }, ReadState::Eof => { return Poll::Ready(Ok(0)); } @@ -110,23 +100,17 @@ where fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &[u8] + buf: &[u8], ) -> Poll<Result<usize>> { - Pin::new( &mut self.stream ).poll_write( cx, buf ) + Pin::new(&mut self.stream).poll_write(cx, buf) } - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_> - ) -> Poll<Result<()>> { - Pin::new( &mut self.stream ).poll_flush( cx ) + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + Pin::new(&mut self.stream).poll_flush(cx) } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_> - ) -> Poll<Result<()>> { - Pin::new( &mut self.stream ).poll_close( cx ) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + Pin::new(&mut self.stream).poll_close(cx) } } @@ -135,18 +119,12 @@ where St: TryStream<Error = Error> + Unpin, St::Ok: AsRef<[u8]>, { - fn poll_fill_buf( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<&[u8]>> { + fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { while let ReadState::PendingChunk = self.state { match ready!(self.stream.try_poll_next_unpin(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { - chunk, - chunk_start: 0, - }; + self.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { @@ -169,12 +147,11 @@ where Poll::Ready(Ok(&[])) } - fn consume( - mut self: Pin<&mut Self>, - amount: usize, - ) { - // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 - if amount == 0 { return } + fn consume(mut self: Pin<&mut Self>, amount: usize) { + // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 + if amount == 0 { + return; + } if let ReadState::Ready { chunk, chunk_start } = &mut self.state { *chunk_start += amount; debug_assert!(*chunk_start <= chunk.as_ref().len()); diff --git a/src/stream/try_stream/into_stream.rs b/src/stream/try_stream/into_stream.rs index 89bc3ef..2126258 100644 --- a/src/stream/try_stream/into_stream.rs +++ b/src/stream/try_stream/into_stream.rs @@ -34,10 +34,7 @@ impl<St: TryStream> Stream for IntoStream<St> { type Item = Result<St::Ok, St::Error>; #[inline] - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { self.project().stream.try_poll_next(cx) } diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index b7353d9..11cd9c0 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -5,18 +5,19 @@ #[cfg(feature = "compat")] use crate::compat::Compat; +use crate::fns::{ + inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn, + IntoFn, MapErrFn, MapOkFn, +}; +use crate::future::assert_future; +use crate::stream::assert_stream; +use crate::stream::{Inspect, Map}; use core::pin::Pin; use futures_core::{ future::{Future, TryFuture}, stream::TryStream, task::{Context, Poll}, }; -use crate::fns::{ - InspectOkFn, inspect_ok_fn, InspectErrFn, inspect_err_fn, MapErrFn, map_err_fn, IntoFn, into_fn, MapOkFn, map_ok_fn, -}; -use crate::future::assert_future; -use crate::stream::{Map, Inspect}; -use crate::stream::assert_stream; mod and_then; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -515,7 +516,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(Err(oneshot::Canceled), fut.await); /// # }) /// ``` - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn try_for_each_concurrent<Fut, F>( self, @@ -836,7 +837,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> where @@ -912,14 +913,16 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); /// ``` - #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] fn try_buffered(self, n: usize) -> TryBuffered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized, { - assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(self, n)) + assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new( + self, n, + )) } // TODO: false positive warning from rustdoc. Verify once #43466 settles diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs index 999123a..cb69e81 100644 --- a/src/stream/try_stream/or_else.rs +++ b/src/stream/try_stream/or_else.rs @@ -2,7 +2,7 @@ use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::ready; -use futures_core::stream::{Stream, TryStream, FusedStream}; +use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -34,9 +34,10 @@ where } impl<St, Fut, F> OrElse<St, Fut, F> - where St: TryStream, - F: FnMut(St::Error) -> Fut, - Fut: TryFuture<Ok = St::Ok>, +where + St: TryStream, + F: FnMut(St::Error) -> Fut, + Fut: TryFuture<Ok = St::Ok>, { pub(super) fn new(stream: St, f: F) -> Self { Self { stream, future: None, f } @@ -46,16 +47,14 @@ impl<St, Fut, F> OrElse<St, Fut, F> } impl<St, Fut, F> Stream for OrElse<St, Fut, F> - where St: TryStream, - F: FnMut(St::Error) -> Fut, - Fut: TryFuture<Ok = St::Ok>, +where + St: TryStream, + F: FnMut(St::Error) -> Fut, + Fut: TryFuture<Ok = St::Ok>, { type Item = Result<St::Ok, Fut::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); Poll::Ready(loop { @@ -68,7 +67,7 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F> Some(Ok(item)) => break Some(Ok(item)), Some(Err(e)) => { this.future.set(Some((this.f)(e))); - }, + } None => break None, } } @@ -88,9 +87,10 @@ impl<St, Fut, F> Stream for OrElse<St, Fut, F> } impl<St, Fut, F> FusedStream for OrElse<St, Fut, F> - where St: TryStream + FusedStream, - F: FnMut(St::Error) -> Fut, - Fut: TryFuture<Ok = St::Ok>, +where + St: TryStream + FusedStream, + F: FnMut(St::Error) -> Fut, + Fut: TryFuture<Ok = St::Ok>, { fn is_terminated(&self) -> bool { self.future.is_none() && self.stream.is_terminated() @@ -100,7 +100,8 @@ impl<St, Fut, F> FusedStream for OrElse<St, Fut, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Fut, F, Item> Sink<Item> for OrElse<S, Fut, F> - where S: Sink<Item>, +where + S: Sink<Item>, { type Error = S::Error; diff --git a/src/stream/try_stream/try_buffer_unordered.rs b/src/stream/try_stream/try_buffer_unordered.rs index 71c6fc7..9a899d4 100644 --- a/src/stream/try_stream/try_buffer_unordered.rs +++ b/src/stream/try_stream/try_buffer_unordered.rs @@ -1,12 +1,12 @@ -use crate::stream::{Fuse, FuturesUnordered, StreamExt, IntoStream}; use crate::future::{IntoFuture, TryFutureExt}; +use crate::stream::{Fuse, FuturesUnordered, IntoStream, StreamExt}; +use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::pin::Pin; pin_project! { /// Stream for the @@ -24,8 +24,9 @@ pin_project! { } impl<St> TryBufferUnordered<St> - where St: TryStream, - St::Ok: TryFuture, +where + St: TryStream, + St::Ok: TryFuture, { pub(super) fn new(stream: St, n: usize) -> Self { Self { @@ -39,15 +40,13 @@ impl<St> TryBufferUnordered<St> } impl<St> Stream for TryBufferUnordered<St> - where St: TryStream, - St::Ok: TryFuture<Error = St::Error>, +where + St: TryStream, + St::Ok: TryFuture<Error = St::Error>, { type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up @@ -77,8 +76,9 @@ impl<St> Stream for TryBufferUnordered<St> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Item, E> Sink<Item> for TryBufferUnordered<S> - where S: TryStream + Sink<Item, Error = E>, - S::Ok: TryFuture<Error = E>, +where + S: TryStream + Sink<Item, Error = E>, + S::Ok: TryFuture<Error = E>, { type Error = E; diff --git a/src/stream/try_stream/try_buffered.rs b/src/stream/try_stream/try_buffered.rs index ff7e844..45bd3f8 100644 --- a/src/stream/try_stream/try_buffered.rs +++ b/src/stream/try_stream/try_buffered.rs @@ -1,12 +1,12 @@ -use crate::stream::{Fuse, FuturesOrdered, StreamExt, IntoStream}; use crate::future::{IntoFuture, TryFutureExt}; +use crate::stream::{Fuse, FuturesOrdered, IntoStream, StreamExt}; +use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; -use core::pin::Pin; pin_project! { /// Stream for the [`try_buffered`](super::TryStreamExt::try_buffered) method. @@ -47,10 +47,7 @@ where { type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs index 387de97..5d3b3d7 100644 --- a/src/stream/try_stream/try_collect.rs +++ b/src/stream/try_stream/try_collect.rs @@ -19,10 +19,7 @@ pin_project! { impl<St: TryStream, C: Default> TryCollect<St, C> { pub(super) fn new(s: St) -> Self { - Self { - stream: s, - items: Default::default(), - } + Self { stream: s, items: Default::default() } } } @@ -43,10 +40,7 @@ where { type Output = Result<C, St::Error>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let mut this = self.project(); Poll::Ready(Ok(loop { match ready!(this.stream.as_mut().try_poll_next(cx)?) { diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs index 2451332..58fb6a5 100644 --- a/src/stream/try_stream/try_concat.rs +++ b/src/stream/try_stream/try_concat.rs @@ -22,10 +22,7 @@ where St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default, { pub(super) fn new(stream: St) -> Self { - Self { - stream, - accum: None, - } + Self { stream, accum: None } } } diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs index eacefd2..61e6105 100644 --- a/src/stream/try_stream/try_filter.rs +++ b/src/stream/try_stream/try_filter.rs @@ -2,7 +2,7 @@ use core::fmt; use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; -use futures_core::stream::{Stream, TryStream, FusedStream}; +use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -40,24 +40,21 @@ where } impl<St, Fut, F> TryFilter<St, Fut, F> - where St: TryStream +where + St: TryStream, { pub(super) fn new(stream: St, f: F) -> Self { - Self { - stream, - f, - pending_fut: None, - pending_item: None, - } + Self { stream, f, pending_fut: None, pending_item: None } } delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> FusedStream for TryFilter<St, Fut, F> - where St: TryStream + FusedStream, - F: FnMut(&St::Ok) -> Fut, - Fut: Future<Output = bool>, +where + St: TryStream + FusedStream, + F: FnMut(&St::Ok) -> Fut, + Fut: Future<Output = bool>, { fn is_terminated(&self) -> bool { self.pending_fut.is_none() && self.stream.is_terminated() @@ -65,16 +62,14 @@ impl<St, Fut, F> FusedStream for TryFilter<St, Fut, F> } impl<St, Fut, F> Stream for TryFilter<St, Fut, F> - where St: TryStream, - Fut: Future<Output = bool>, - F: FnMut(&St::Ok) -> Fut, +where + St: TryStream, + Fut: Future<Output = bool>, + F: FnMut(&St::Ok) -> Fut, { type Item = Result<St::Ok, St::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); Poll::Ready(loop { @@ -108,7 +103,8 @@ impl<St, Fut, F> Stream for TryFilter<St, Fut, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Fut, F, Item, E> Sink<Item> for TryFilter<S, Fut, F> - where S: TryStream + Sink<Item, Error = E>, +where + S: TryStream + Sink<Item, Error = E>, { type Error = E; diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs index 335649b..bb1b5b9 100644 --- a/src/stream/try_stream/try_filter_map.rs +++ b/src/stream/try_stream/try_filter_map.rs @@ -1,8 +1,8 @@ use core::fmt; use core::pin::Pin; -use futures_core::future::{TryFuture}; +use futures_core::future::TryFuture; use futures_core::ready; -use futures_core::stream::{Stream, TryStream, FusedStream}; +use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -43,9 +43,10 @@ impl<St, Fut, F> TryFilterMap<St, Fut, F> { } impl<St, Fut, F, T> FusedStream for TryFilterMap<St, Fut, F> - where St: TryStream + FusedStream, - Fut: TryFuture<Ok = Option<T>, Error = St::Error>, - F: FnMut(St::Ok) -> Fut, +where + St: TryStream + FusedStream, + Fut: TryFuture<Ok = Option<T>, Error = St::Error>, + F: FnMut(St::Ok) -> Fut, { fn is_terminated(&self) -> bool { self.pending.is_none() && self.stream.is_terminated() @@ -53,16 +54,14 @@ impl<St, Fut, F, T> FusedStream for TryFilterMap<St, Fut, F> } impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F> - where St: TryStream, - Fut: TryFuture<Ok = Option<T>, Error = St::Error>, - F: FnMut(St::Ok) -> Fut, +where + St: TryStream, + Fut: TryFuture<Ok = Option<T>, Error = St::Error>, + F: FnMut(St::Ok) -> Fut, { type Item = Result<T, St::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); Poll::Ready(loop { @@ -98,7 +97,8 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Fut, F, Item> Sink<Item> for TryFilterMap<S, Fut, F> - where S: Sink<Item>, +where + S: Sink<Item>, { type Error = S::Error; diff --git a/src/stream/try_stream/try_fold.rs b/src/stream/try_stream/try_fold.rs index 1d41e4b..d344d96 100644 --- a/src/stream/try_stream/try_fold.rs +++ b/src/stream/try_stream/try_fold.rs @@ -35,24 +35,21 @@ where } impl<St, Fut, T, F> TryFold<St, Fut, T, F> -where St: TryStream, - F: FnMut(T, St::Ok) -> Fut, - Fut: TryFuture<Ok = T, Error = St::Error>, +where + St: TryStream, + F: FnMut(T, St::Ok) -> Fut, + Fut: TryFuture<Ok = T, Error = St::Error>, { pub(super) fn new(stream: St, f: F, t: T) -> Self { - Self { - stream, - f, - accum: Some(t), - future: None, - } + Self { stream, f, accum: Some(t), future: None } } } impl<St, Fut, T, F> FusedFuture for TryFold<St, Fut, T, F> - where St: TryStream, - F: FnMut(T, St::Ok) -> Fut, - Fut: TryFuture<Ok = T, Error = St::Error>, +where + St: TryStream, + F: FnMut(T, St::Ok) -> Fut, + Fut: TryFuture<Ok = T, Error = St::Error>, { fn is_terminated(&self) -> bool { self.accum.is_none() && self.future.is_none() @@ -60,9 +57,10 @@ impl<St, Fut, T, F> FusedFuture for TryFold<St, Fut, T, F> } impl<St, Fut, T, F> Future for TryFold<St, Fut, T, F> - where St: TryStream, - F: FnMut(T, St::Ok) -> Fut, - Fut: TryFuture<Ok = T, Error = St::Error>, +where + St: TryStream, + F: FnMut(T, St::Ok) -> Fut, + Fut: TryFuture<Ok = T, Error = St::Error>, { type Output = Result<T, St::Error>; diff --git a/src/stream/try_stream/try_for_each.rs b/src/stream/try_stream/try_for_each.rs index 0a814ae..6a081d8 100644 --- a/src/stream/try_stream/try_for_each.rs +++ b/src/stream/try_stream/try_for_each.rs @@ -32,23 +32,21 @@ where } impl<St, Fut, F> TryForEach<St, Fut, F> -where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: TryFuture<Ok = (), Error = St::Error>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: TryFuture<Ok = (), Error = St::Error>, { pub(super) fn new(stream: St, f: F) -> Self { - Self { - stream, - f, - future: None, - } + Self { stream, f, future: None } } } impl<St, Fut, F> Future for TryForEach<St, Fut, F> - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: TryFuture<Ok = (), Error = St::Error>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: TryFuture<Ok = (), Error = St::Error>, { type Output = Result<(), St::Error>; diff --git a/src/stream/try_stream/try_for_each_concurrent.rs b/src/stream/try_stream/try_for_each_concurrent.rs index d2f4b0f..62734c7 100644 --- a/src/stream/try_stream/try_for_each_concurrent.rs +++ b/src/stream/try_stream/try_for_each_concurrent.rs @@ -1,8 +1,8 @@ use crate::stream::{FuturesUnordered, StreamExt}; use core::fmt; use core::mem; -use core::pin::Pin; use core::num::NonZeroUsize; +use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; @@ -37,9 +37,10 @@ where } impl<St, Fut, F> FusedFuture for TryForEachConcurrent<St, Fut, F> - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future<Output = Result<(), St::Error>>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future<Output = Result<(), St::Error>>, { fn is_terminated(&self) -> bool { self.stream.is_none() && self.futures.is_empty() @@ -47,9 +48,10 @@ impl<St, Fut, F> FusedFuture for TryForEachConcurrent<St, Fut, F> } impl<St, Fut, F> TryForEachConcurrent<St, Fut, F> -where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future<Output = Result<(), St::Error>>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future<Output = Result<(), St::Error>>, { pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self { Self { @@ -63,9 +65,10 @@ where St: TryStream, } impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> - where St: TryStream, - F: FnMut(St::Ok) -> Fut, - Fut: Future<Output = Result<(), St::Error>>, +where + St: TryStream, + F: FnMut(St::Ok) -> Fut, + Fut: Future<Output = Result<(), St::Error>>, { type Output = Result<(), St::Error>; @@ -85,7 +88,7 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> Poll::Ready(Some(Ok(elem))) => { made_progress_this_iter = true; Some(elem) - }, + } Poll::Ready(None) => { this.stream.set(None); None @@ -109,9 +112,9 @@ impl<St, Fut, F> Future for TryForEachConcurrent<St, Fut, F> Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true, Poll::Ready(None) => { if this.stream.is_none() { - return Poll::Ready(Ok(())) + return Poll::Ready(Ok(())); } - }, + } Poll::Pending => {} Poll::Ready(Some(Err(e))) => { // Empty the stream and futures so that we know diff --git a/src/stream/try_stream/try_next.rs b/src/stream/try_stream/try_next.rs index 1bc00fb..13fcf80 100644 --- a/src/stream/try_stream/try_next.rs +++ b/src/stream/try_stream/try_next.rs @@ -28,10 +28,7 @@ impl<St: ?Sized + TryStream + Unpin + FusedStream> FusedFuture for TryNext<'_, S impl<St: ?Sized + TryStream + Unpin> Future for TryNext<'_, St> { type Output = Result<Option<St::Ok>, St::Error>; - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.stream.try_poll_next_unpin(cx)?.map(Ok) } } diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs index 0603b10..a424b6c 100644 --- a/src/stream/try_stream/try_skip_while.rs +++ b/src/stream/try_stream/try_skip_while.rs @@ -2,7 +2,7 @@ use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; use futures_core::ready; -use futures_core::stream::{Stream, TryStream, FusedStream}; +use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; @@ -40,34 +40,27 @@ where } impl<St, Fut, F> TrySkipWhile<St, Fut, F> - where St: TryStream, - F: FnMut(&St::Ok) -> Fut, - Fut: TryFuture<Ok = bool, Error = St::Error>, +where + St: TryStream, + F: FnMut(&St::Ok) -> Fut, + Fut: TryFuture<Ok = bool, Error = St::Error>, { pub(super) fn new(stream: St, f: F) -> Self { - Self { - stream, - f, - pending_fut: None, - pending_item: None, - done_skipping: false, - } + Self { stream, f, pending_fut: None, pending_item: None, done_skipping: false } } delegate_access_inner!(stream, St, ()); } impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F> - where St: TryStream, - F: FnMut(&St::Ok) -> Fut, - Fut: TryFuture<Ok = bool, Error = St::Error>, +where + St: TryStream, + F: FnMut(&St::Ok) -> Fut, + Fut: TryFuture<Ok = bool, Error = St::Error>, { type Item = Result<St::Ok, St::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); if *this.done_skipping { @@ -105,9 +98,10 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F> } impl<St, Fut, F> FusedStream for TrySkipWhile<St, Fut, F> - where St: TryStream + FusedStream, - F: FnMut(&St::Ok) -> Fut, - Fut: TryFuture<Ok = bool, Error = St::Error>, +where + St: TryStream + FusedStream, + F: FnMut(&St::Ok) -> Fut, + Fut: TryFuture<Ok = bool, Error = St::Error>, { fn is_terminated(&self) -> bool { self.pending_item.is_none() && self.stream.is_terminated() @@ -117,7 +111,8 @@ impl<St, Fut, F> FusedStream for TrySkipWhile<St, Fut, F> // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Fut, F, Item, E> Sink<Item> for TrySkipWhile<S, Fut, F> - where S: TryStream + Sink<Item, Error = E>, +where + S: TryStream + Sink<Item, Error = E>, { type Error = E; diff --git a/src/stream/try_stream/try_take_while.rs b/src/stream/try_stream/try_take_while.rs index 6241572..3375960 100644 --- a/src/stream/try_stream/try_take_while.rs +++ b/src/stream/try_stream/try_take_while.rs @@ -49,13 +49,7 @@ where Fut: TryFuture<Ok = bool, Error = St::Error>, { pub(super) fn new(stream: St, f: F) -> Self { - Self { - stream, - f, - pending_fut: None, - pending_item: None, - done_taking: false, - } + Self { stream, f, pending_fut: None, pending_item: None, done_taking: false } } delegate_access_inner!(stream, St, ()); diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs index 258c18e..fd9cdf1 100644 --- a/src/stream/try_stream/try_unfold.rs +++ b/src/stream/try_stream/try_unfold.rs @@ -61,11 +61,7 @@ where F: FnMut(T) -> Fut, Fut: TryFuture<Ok = Option<(Item, T)>>, { - assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold { - f, - state: Some(init), - fut: None, - }) + assert_stream::<Result<Item, Fut::Error>, _>(TryUnfold { f, state: Some(init), fut: None }) } pin_project! { @@ -85,10 +81,7 @@ where Fut: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryUnfold") - .field("state", &self.state) - .field("fut", &self.fut) - .finish() + f.debug_struct("TryUnfold").field("state", &self.state).field("fut", &self.fut).finish() } } @@ -99,10 +92,7 @@ where { type Item = Result<Item, Fut::Error>; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); if let Some(state) = this.state.take() { diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs index e17d465..7d8ef6b 100644 --- a/src/stream/unfold.rs +++ b/src/stream/unfold.rs @@ -52,10 +52,7 @@ where F: FnMut(T) -> Fut, Fut: Future<Output = Option<(Item, T)>>, { - assert_stream::<Item, _>(Unfold { - f, - state: UnfoldState::Value { value: init }, - }) + assert_stream::<Item, _>(Unfold { f, state: UnfoldState::Value { value: init } }) } pin_project! { @@ -74,9 +71,7 @@ where Fut: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Unfold") - .field("state", &self.state) - .finish() + f.debug_struct("Unfold").field("state", &self.state).finish() } } @@ -105,9 +100,7 @@ where let mut this = self.project(); if let Some(state) = this.state.as_mut().take_value() { - this.state.set(UnfoldState::Future { - future: (this.f)(state), - }); + this.state.set(UnfoldState::Future { future: (this.f)(state) }); } let step = match this.state.as_mut().project_future() { diff --git a/src/task/mod.rs b/src/task/mod.rs index dd1515c..c4afe30 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -11,12 +11,9 @@ //! executors or dealing with synchronization issues around task wakeup. #[doc(no_inline)] -pub use core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable}; +pub use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -pub use futures_task::{ - Spawn, LocalSpawn, SpawnError, - FutureObj, LocalFutureObj, UnsafeFutureObj, -}; +pub use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, UnsafeFutureObj}; pub use futures_task::noop_waker; #[cfg(feature = "std")] @@ -36,4 +33,4 @@ cfg_target_has_atomic! { } mod spawn; -pub use self::spawn::{SpawnExt, LocalSpawnExt}; +pub use self::spawn::{LocalSpawnExt, SpawnExt}; |