diff options
author | Treehugger Robot <treehugger-gerrit@google.com> | 2021-02-10 12:32:08 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-02-10 12:32:08 +0000 |
commit | ace3f190a64e6095d8888f52cf5e188db07b59af (patch) | |
tree | 6768a19a65780281fc232fc768b947d4be2b124b | |
parent | b79273b58553c67dbfc3be350ce662ec1763d8bf (diff) | |
parent | 102eb371e74174a76a956011444b11fd7eebe656 (diff) | |
download | futures-util-ace3f190a64e6095d8888f52cf5e188db07b59af.tar.gz |
Merge "Upgrade rust/crates/futures-util to 0.3.12" am: 102eb371e7
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/1581956
MUST ONLY BE SUBMITTED BY AUTOMERGER
Change-Id: I1094a39a3e2a6f3229cbacef8c0510c400b1050d
141 files changed, 2434 insertions, 1385 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..4fd4ba3 --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,5 @@ +{ + "git": { + "sha1": "1d53a29ec16ccd5b094fb205edb73591455eb4b6" + } +} @@ -32,10 +32,11 @@ rust_defaults { "libfutures_sink", "libfutures_task", "libmemchr", - "libpin_project", + "libpin_project_lite", "libpin_utils", "libproc_macro_nested", "libslab", + "libtokio", ], proc_macros: [ "libfutures_macro", @@ -46,6 +47,9 @@ rust_defaults { rust_test_host { name: "futures-util_host_test_src_lib", defaults: ["futures-util_defaults"], + test_options: { + unit_test: true, + }, } rust_test { @@ -84,7 +88,7 @@ rust_library { "libfutures_sink", "libfutures_task", "libmemchr", - "libpin_project", + "libpin_project_lite", "libpin_utils", "libproc_macro_nested", "libslab", @@ -100,21 +104,62 @@ rust_library { } // dependent_library ["feature_list"] -// futures-channel-0.3.8 "alloc,std" -// futures-core-0.3.8 "alloc,std" -// futures-io-0.3.8 "std" -// futures-macro-0.3.7 -// futures-sink-0.3.8 -// futures-task-0.3.8 "alloc,once_cell,std" +// autocfg-1.0.1 +// byteorder-1.4.2 "default,std" +// bytes-0.4.12 +// cfg-if-0.1.10 +// cfg-if-1.0.0 +// crossbeam-deque-0.7.3 +// crossbeam-epoch-0.8.2 "default,lazy_static,std" +// crossbeam-queue-0.2.3 "default,std" +// crossbeam-utils-0.7.2 "default,lazy_static,std" +// fnv-1.0.7 "default,std" +// futures-0.1.30 "default,use_std,with-deprecated" +// futures-channel-0.3.12 "alloc,std" +// futures-core-0.3.12 "alloc,std" +// futures-io-0.3.12 "std" +// futures-macro-0.3.12 +// futures-sink-0.3.12 +// futures-task-0.3.12 "alloc,once_cell,std" +// iovec-0.1.4 +// lazy_static-1.4.0 +// libc-0.2.86 "default,std" +// lock_api-0.3.4 +// log-0.4.14 +// maybe-uninit-2.0.0 // memchr-2.3.4 "default,std" +// memoffset-0.5.6 "default" +// mio-0.6.23 "default,with-deprecated" +// mio-uds-0.6.8 +// net2-0.2.37 "default,duration" +// num_cpus-1.13.0 // once_cell-1.5.2 "alloc,std" -// pin-project-1.0.2 -// pin-project-internal-1.0.2 +// parking_lot-0.9.0 "default" +// parking_lot_core-0.6.2 +// pin-project-lite-0.2.4 // pin-utils-0.1.0 // proc-macro-hack-0.5.19 -// proc-macro-nested-0.1.6 +// proc-macro-nested-0.1.7 // proc-macro2-1.0.24 "default,proc-macro" -// quote-1.0.7 "default,proc-macro" +// quote-1.0.8 "default,proc-macro" +// rustc_version-0.2.3 +// scopeguard-1.1.0 +// semver-0.9.0 "default" +// semver-parser-0.7.0 // slab-0.4.2 -// syn-1.0.53 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut" +// smallvec-0.6.14 "default,std" +// syn-1.0.60 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote" +// tokio-0.1.22 "bytes,codec,default,fs,io,mio,num_cpus,reactor,rt-full,sync,tcp,timer,tokio-codec,tokio-current-thread,tokio-executor,tokio-fs,tokio-io,tokio-reactor,tokio-sync,tokio-tcp,tokio-threadpool,tokio-timer,tokio-udp,tokio-uds,udp,uds" +// tokio-codec-0.1.2 +// tokio-current-thread-0.1.7 +// tokio-executor-0.1.10 +// tokio-fs-0.1.7 +// tokio-io-0.1.13 +// tokio-reactor-0.1.12 +// tokio-sync-0.1.8 +// tokio-tcp-0.1.4 +// tokio-threadpool-0.1.18 +// tokio-timer-0.2.13 +// tokio-udp-0.1.6 +// tokio-uds-0.2.7 // unicode-xid-0.2.1 "default" @@ -13,44 +13,44 @@ [package] edition = "2018" name = "futures-util" -version = "0.3.7" +version = "0.3.12" authors = ["Alex Crichton <alex@alexcrichton.com>"] description = "Common utilities and extension traits for the futures-rs library.\n" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures-util/0.3.7" +documentation = "https://docs.rs/futures-util/0.3" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies.futures-channel] -version = "0.3.7" +version = "0.3.12" features = ["std"] optional = true default-features = false [dependencies.futures-core] -version = "0.3.7" +version = "0.3.12" default-features = false [dependencies.futures-io] -version = "0.3.7" +version = "0.3.12" features = ["std"] optional = true default-features = false [dependencies.futures-macro] -version = "=0.3.7" +version = "=0.3.12" optional = true default-features = false [dependencies.futures-sink] -version = "0.3.7" +version = "0.3.12" optional = true default-features = false [dependencies.futures-task] -version = "0.3.7" +version = "0.3.12" default-features = false [dependencies.futures_01] @@ -62,14 +62,14 @@ package = "futures" version = "2.2" optional = true -[dependencies.pin-project] -version = "1.0.1" +[dependencies.pin-project-lite] +version = "0.2.4" [dependencies.pin-utils] version = "0.1.0" [dependencies.proc-macro-hack] -version = "0.5.9" +version = "0.5.19" optional = true [dependencies.proc-macro-nested] @@ -83,6 +83,8 @@ optional = true [dependencies.tokio-io] version = "0.1.9" optional = true +[dev-dependencies.tokio] +version = "0.1.11" [features] alloc = ["futures-core/alloc", "futures-task/alloc"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index c768e59..010902b 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,12 +1,12 @@ [package] name = "futures-util" edition = "2018" -version = "0.3.7" +version = "0.3.12" authors = ["Alex Crichton <alex@alexcrichton.com>"] license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures-util/0.3.7" +documentation = "https://docs.rs/futures-util/0.3" description = """ Common utilities and extension traits for the futures-rs library. """ @@ -33,20 +33,25 @@ read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"] write-all-vectored = ["io"] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.7", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.7", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.7", default-features = false, features = ["std"], optional = true } -futures-io = { path = "../futures-io", version = "0.3.7", default-features = false, features = ["std"], optional = true } -futures-sink = { path = "../futures-sink", version = "0.3.7", default-features = false, optional = true } -futures-macro = { path = "../futures-macro", version = "=0.3.7", default-features = false, optional = true } -proc-macro-hack = { version = "0.5.9", optional = true } +futures-core = { path = "../futures-core", version = "0.3.12", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.12", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.12", default-features = false, features = ["std"], optional = true } +futures-io = { path = "../futures-io", version = "0.3.12", default-features = false, features = ["std"], optional = true } +futures-sink = { path = "../futures-sink", version = "0.3.12", default-features = false, optional = true } +futures-macro = { path = "../futures-macro", version = "=0.3.12", default-features = false, optional = true } +proc-macro-hack = { version = "0.5.19", optional = true } proc-macro-nested = { version = "0.1.2", optional = true } slab = { version = "0.4.2", optional = true } memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" -pin-project = "1.0.1" +pin-project-lite = "0.2.4" + +[dev-dependencies] +futures = { path = "../futures", features = ["async-await", "thread-pool"] } +futures-test = { path = "../futures-test" } +tokio = "0.1.11" [package.metadata.docs.rs] all-features = true @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-util/futures-util-0.3.7.crate" + value: "https://static.crates.io/crates/futures-util/futures-util-0.3.12.crate" } - version: "0.3.7" + version: "0.3.12" license_type: NOTICE last_upgrade_date { - year: 2020 - month: 10 - day: 25 + year: 2021 + month: 2 + day: 9 } } diff --git a/TEST_MAPPING b/TEST_MAPPING index b518f2a..7e10dd0 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -1,4 +1,4 @@ -// Generated by cargo2android.py for tests in Android.bp +// Generated by update_crate_tests.py for tests that depend on this crate. { "presubmit": [ { diff --git a/benches_disabled/bilock.rs b/benches_disabled/bilock.rs index 78b5edb..48afe3c 100644 --- a/benches_disabled/bilock.rs +++ b/benches_disabled/bilock.rs @@ -29,8 +29,8 @@ struct LockStream { } impl LockStream { - fn new(lock: BiLock<u32>) -> LockStream { - LockStream { + fn new(lock: BiLock<u32>) -> Self { + Self { lock: lock.lock() } } diff --git a/src/async_await/join_mod.rs b/src/async_await/join_mod.rs index 4200c08..d6ddb6d 100644 --- a/src/async_await/join_mod.rs +++ b/src/async_await/join_mod.rs @@ -82,11 +82,11 @@ macro_rules! document_join_macro { } #[doc(hidden)] -#[proc_macro_hack(support_nested)] +#[proc_macro_hack(support_nested, only_hack_old_rustc)] pub use futures_macro::join_internal; #[doc(hidden)] -#[proc_macro_hack(support_nested)] +#[proc_macro_hack(support_nested, only_hack_old_rustc)] pub use futures_macro::try_join_internal; document_join_macro! { diff --git a/src/async_await/select_mod.rs b/src/async_await/select_mod.rs index 47eca4d..59bca08 100644 --- a/src/async_await/select_mod.rs +++ b/src/async_await/select_mod.rs @@ -310,11 +310,11 @@ macro_rules! document_select_macro { #[cfg(feature = "std")] #[doc(hidden)] -#[proc_macro_hack(support_nested)] +#[proc_macro_hack(support_nested, only_hack_old_rustc)] pub use futures_macro::select_internal; #[doc(hidden)] -#[proc_macro_hack(support_nested)] +#[proc_macro_hack(support_nested, only_hack_old_rustc)] pub use futures_macro::select_biased_internal; document_select_macro! { diff --git a/src/compat/compat01as03.rs b/src/compat/compat01as03.rs index 95025d2..bc3aee3 100644 --- a/src/compat/compat01as03.rs +++ b/src/compat/compat01as03.rs @@ -32,8 +32,8 @@ impl<T> Unpin for Compat01As03<T> {} impl<T> Compat01As03<T> { /// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite /// object in a futures 0.3-compatible wrapper. - pub fn new(object: T) -> Compat01As03<T> { - Compat01As03 { + pub fn new(object: T) -> Self { + Self { inner: spawn01(object), } } @@ -197,8 +197,8 @@ impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {} #[cfg(feature = "sink")] impl<S, SinkItem> Compat01As03Sink<S, SinkItem> { /// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper. - pub fn new(inner: S) -> Compat01As03Sink<S, SinkItem> { - Compat01As03Sink { + pub fn new(inner: S) -> Self { + Self { inner: spawn01(inner), buffer: None, close_started: false @@ -344,10 +344,10 @@ struct NotifyWaker(task03::Waker); struct WakerToHandle<'a>(&'a task03::Waker); impl From<WakerToHandle<'_>> for NotifyHandle01 { - fn from(handle: WakerToHandle<'_>) -> NotifyHandle01 { + fn from(handle: WakerToHandle<'_>) -> Self { let ptr = Box::new(NotifyWaker(handle.0.clone())); - unsafe { NotifyHandle01::new(Box::into_raw(ptr)) } + unsafe { Self::new(Box::into_raw(ptr)) } } } diff --git a/src/compat/compat03as01.rs b/src/compat/compat03as01.rs index 4841c5e..3f1eebb 100644 --- a/src/compat/compat03as01.rs +++ b/src/compat/compat03as01.rs @@ -54,8 +54,8 @@ impl<T> Compat<T> { /// For types which implement appropriate futures `0.3` /// traits, the result will be a type which implements /// the corresponding futures 0.1 type. - pub fn new(inner: T) -> Compat<T> { - Compat { inner } + pub fn new(inner: T) -> Self { + Self { inner } } /// Get a reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object @@ -80,7 +80,7 @@ impl<T> Compat<T> { impl<T, Item> CompatSink<T, Item> { /// Creates a new [`CompatSink`]. pub fn new(inner: T) -> Self { - CompatSink { + Self { inner, _phantom: PhantomData, } @@ -174,8 +174,8 @@ where struct Current(task01::Task); impl Current { - fn new() -> Current { - Current(task01::current()) + fn new() -> Self { + Self(task01::current()) } fn as_waker(&self) -> WakerRef<'_> { diff --git a/src/compat/mod.rs b/src/compat/mod.rs index 897a50a..c5edcc5 100644 --- a/src/compat/mod.rs +++ b/src/compat/mod.rs @@ -1,4 +1,4 @@ -//! Futures 0.1 / 0.3 shims +//! Interop between `futures` 0.1 and 0.3. //! //! This module is only available when the `compat` feature of this //! library is activated. @@ -75,7 +75,7 @@ pub struct OkFn<E>(PhantomData<fn(E)>); impl<E> Default for OkFn<E> { fn default() -> Self { - OkFn(PhantomData) + Self(PhantomData) } } @@ -344,7 +344,7 @@ pub struct IntoFn<T>(PhantomData<fn() -> T>); impl<T> Default for IntoFn<T> { fn default() -> Self { - IntoFn(PhantomData) + Self(PhantomData) } } impl<A, T> FnOnce1<A> for IntoFn<T> where A: Into<T> { diff --git a/src/future/abortable.rs b/src/future/abortable.rs index 3a6b587..1fc75b0 100644 --- a/src/future/abortable.rs +++ b/src/future/abortable.rs @@ -5,16 +5,17 @@ use core::fmt; use core::pin::Pin; use core::sync::atomic::{AtomicBool, Ordering}; use alloc::sync::Arc; -use pin_project::pin_project; - -/// A future which can be remotely short-circuited using an `AbortHandle`. -#[pin_project] -#[derive(Debug, Clone)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Abortable<Fut> { - #[pin] - future: Fut, - inner: Arc<AbortInner>, +use pin_project_lite::pin_project; + +pin_project! { + /// A future which can be remotely short-circuited using an `AbortHandle`. + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Abortable<Fut> { + #[pin] + future: Fut, + inner: Arc<AbortInner>, + } } impl<Fut> Abortable<Fut> where Fut: Future { @@ -38,7 +39,7 @@ impl<Fut> Abortable<Fut> where Fut: Future { /// # }); /// ``` pub fn new(future: Fut, reg: AbortRegistration) -> Self { - Abortable { + Self { future, inner: reg.inner, } @@ -84,7 +85,7 @@ impl AbortHandle { }); ( - AbortHandle { + Self { inner: inner.clone(), }, AbortRegistration { diff --git a/src/future/either.rs b/src/future/either.rs index aa17fa7..a1b9f0a 100644 --- a/src/future/either.rs +++ b/src/future/either.rs @@ -4,17 +4,26 @@ use futures_core::future::{FusedFuture, Future}; use futures_core::stream::{FusedStream, Stream}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; /// Combines two different futures, streams, or sinks having the same associated types into a single /// type. -#[pin_project(project = EitherProj)] #[derive(Debug, Clone)] pub enum Either<A, B> { /// First branch of the type - Left(#[pin] A), + Left(/* #[pin] */ A), /// Second branch of the type - Right(#[pin] B), + Right(/* #[pin] */ B), +} + +impl<A, B> Either<A, B> { + fn project(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> { + unsafe { + match self.get_unchecked_mut() { + Either::Left(a) => Either::Left(Pin::new_unchecked(a)), + Either::Right(b) => Either::Right(Pin::new_unchecked(b)), + } + } + } } impl<A, B, T> Either<(T, A), (T, B)> { @@ -60,8 +69,8 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { match self.project() { - EitherProj::Left(x) => x.poll(cx), - EitherProj::Right(x) => x.poll(cx), + Either::Left(x) => x.poll(cx), + Either::Right(x) => x.poll(cx), } } } @@ -88,8 +97,8 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { match self.project() { - EitherProj::Left(x) => x.poll_next(cx), - EitherProj::Right(x) => x.poll_next(cx), + Either::Left(x) => x.poll_next(cx), + Either::Right(x) => x.poll_next(cx), } } } @@ -117,29 +126,29 @@ where fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match self.project() { - EitherProj::Left(x) => x.poll_ready(cx), - EitherProj::Right(x) => x.poll_ready(cx), + Either::Left(x) => x.poll_ready(cx), + Either::Right(x) => x.poll_ready(cx), } } fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { match self.project() { - EitherProj::Left(x) => x.start_send(item), - EitherProj::Right(x) => x.start_send(item), + Either::Left(x) => x.start_send(item), + Either::Right(x) => x.start_send(item), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match self.project() { - EitherProj::Left(x) => x.poll_flush(cx), - EitherProj::Right(x) => x.poll_flush(cx), + Either::Left(x) => x.poll_flush(cx), + Either::Right(x) => x.poll_flush(cx), } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match self.project() { - EitherProj::Left(x) => x.poll_close(cx), - EitherProj::Right(x) => x.poll_close(cx), + Either::Left(x) => x.poll_close(cx), + Either::Right(x) => x.poll_close(cx), } } } @@ -176,8 +185,8 @@ mod if_std { buf: &mut [u8], ) -> Poll<Result<usize>> { match self.project() { - EitherProj::Left(x) => x.poll_read(cx, buf), - EitherProj::Right(x) => x.poll_read(cx, buf), + Either::Left(x) => x.poll_read(cx, buf), + Either::Right(x) => x.poll_read(cx, buf), } } @@ -187,8 +196,8 @@ mod if_std { bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>> { match self.project() { - EitherProj::Left(x) => x.poll_read_vectored(cx, bufs), - EitherProj::Right(x) => x.poll_read_vectored(cx, bufs), + Either::Left(x) => x.poll_read_vectored(cx, bufs), + Either::Right(x) => x.poll_read_vectored(cx, bufs), } } } @@ -204,8 +213,8 @@ mod if_std { buf: &[u8], ) -> Poll<Result<usize>> { match self.project() { - EitherProj::Left(x) => x.poll_write(cx, buf), - EitherProj::Right(x) => x.poll_write(cx, buf), + Either::Left(x) => x.poll_write(cx, buf), + Either::Right(x) => x.poll_write(cx, buf), } } @@ -215,22 +224,22 @@ mod if_std { bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>> { match self.project() { - EitherProj::Left(x) => x.poll_write_vectored(cx, bufs), - EitherProj::Right(x) => x.poll_write_vectored(cx, bufs), + Either::Left(x) => x.poll_write_vectored(cx, bufs), + Either::Right(x) => x.poll_write_vectored(cx, bufs), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { match self.project() { - EitherProj::Left(x) => x.poll_flush(cx), - EitherProj::Right(x) => x.poll_flush(cx), + Either::Left(x) => x.poll_flush(cx), + Either::Right(x) => x.poll_flush(cx), } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { match self.project() { - EitherProj::Left(x) => x.poll_close(cx), - EitherProj::Right(x) => x.poll_close(cx), + Either::Left(x) => x.poll_close(cx), + Either::Right(x) => x.poll_close(cx), } } } @@ -246,8 +255,8 @@ mod if_std { pos: SeekFrom, ) -> Poll<Result<u64>> { match self.project() { - EitherProj::Left(x) => x.poll_seek(cx, pos), - EitherProj::Right(x) => x.poll_seek(cx, pos), + Either::Left(x) => x.poll_seek(cx, pos), + Either::Right(x) => x.poll_seek(cx, pos), } } } @@ -259,15 +268,15 @@ mod if_std { { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { match self.project() { - EitherProj::Left(x) => x.poll_fill_buf(cx), - EitherProj::Right(x) => x.poll_fill_buf(cx), + Either::Left(x) => x.poll_fill_buf(cx), + Either::Right(x) => x.poll_fill_buf(cx), } } fn consume(self: Pin<&mut Self>, amt: usize) { match self.project() { - EitherProj::Left(x) => x.consume(amt), - EitherProj::Right(x) => x.consume(amt), + Either::Left(x) => x.consume(amt), + Either::Right(x) => x.consume(amt), } } } diff --git a/src/future/future/catch_unwind.rs b/src/future/future/catch_unwind.rs index 33839f6..3f16577 100644 --- a/src/future/future/catch_unwind.rs +++ b/src/future/future/catch_unwind.rs @@ -4,17 +4,21 @@ use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CatchUnwind<Fut>(#[pin] Fut); +pin_project! { + /// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CatchUnwind<Fut> { + #[pin] + future: Fut, + } +} impl<Fut> CatchUnwind<Fut> where Fut: Future + UnwindSafe { - pub(super) fn new(future: Fut) -> CatchUnwind<Fut> { - CatchUnwind(future) + pub(super) fn new(future: Fut) -> Self { + Self { future } } } @@ -24,7 +28,7 @@ impl<Fut> Future for CatchUnwind<Fut> type Output = Result<Fut::Output, Box<dyn Any + Send>>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let f = self.project().0; + let f = self.project().future; catch_unwind(AssertUnwindSafe(|| f.poll(cx)))?.map(Ok) } } diff --git a/src/future/future/flatten.rs b/src/future/future/flatten.rs index 53f75e2..0c48a4f 100644 --- a/src/future/future/flatten.rs +++ b/src/future/future/flatten.rs @@ -1,22 +1,25 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; #[cfg(feature = "sink")] use futures_sink::Sink; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -#[pin_project(project = FlattenProj)] -#[derive(Debug)] -pub enum Flatten<Fut1, Fut2> { - First(#[pin] Fut1), - Second(#[pin] Fut2), - Empty, +pin_project! { + #[project = FlattenProj] + #[derive(Debug)] + pub enum Flatten<Fut1, Fut2> { + First { #[pin] f: Fut1 }, + Second { #[pin] f: Fut2 }, + Empty, + } } impl<Fut1, Fut2> Flatten<Fut1, Fut2> { pub(crate) fn new(future: Fut1) -> Self { - Flatten::First(future) + Self::First { f: future } } } @@ -26,7 +29,7 @@ impl<Fut> FusedFuture for Flatten<Fut, Fut::Output> { fn is_terminated(&self) -> bool { match self { - Flatten::Empty => true, + Self::Empty => true, _ => false, } } @@ -41,13 +44,13 @@ impl<Fut> Future for Flatten<Fut, Fut::Output> fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Poll::Ready(loop { match self.as_mut().project() { - FlattenProj::First(f) => { + FlattenProj::First { f } => { let f = ready!(f.poll(cx)); - self.set(Flatten::Second(f)); + self.set(Self::Second { f }); }, - FlattenProj::Second(f) => { + FlattenProj::Second { f } => { let output = ready!(f.poll(cx)); - self.set(Flatten::Empty); + self.set(Self::Empty); break output; }, FlattenProj::Empty => panic!("Flatten polled after completion"), @@ -62,7 +65,7 @@ impl<Fut> FusedStream for Flatten<Fut, Fut::Output> { fn is_terminated(&self) -> bool { match self { - Flatten::Empty => true, + Self::Empty => true, _ => false, } } @@ -77,14 +80,14 @@ impl<Fut> Stream for Flatten<Fut, Fut::Output> fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Ready(loop { match self.as_mut().project() { - FlattenProj::First(f) => { + FlattenProj::First { f } => { let f = ready!(f.poll(cx)); - self.set(Flatten::Second(f)); + self.set(Self::Second { f }); }, - FlattenProj::Second(f) => { + FlattenProj::Second { f } => { let output = ready!(f.poll_next(cx)); if output.is_none() { - self.set(Flatten::Empty); + self.set(Self::Empty); } break output; }, @@ -109,11 +112,11 @@ where ) -> Poll<Result<(), Self::Error>> { Poll::Ready(loop { match self.as_mut().project() { - FlattenProj::First(f) => { + FlattenProj::First { f } => { let f = ready!(f.poll(cx)); - self.set(Flatten::Second(f)); + self.set(Self::Second { f }); }, - FlattenProj::Second(f) => { + FlattenProj::Second { f } => { break ready!(f.poll_ready(cx)); }, FlattenProj::Empty => panic!("poll_ready called after eof"), @@ -123,16 +126,16 @@ where fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { match self.project() { - FlattenProj::First(_) => panic!("poll_ready not called first"), - FlattenProj::Second(f) => f.start_send(item), + FlattenProj::First { .. } => panic!("poll_ready not called first"), + FlattenProj::Second { f } => f.start_send(item), FlattenProj::Empty => panic!("start_send called after eof"), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match self.project() { - FlattenProj::First(_) => Poll::Ready(Ok(())), - FlattenProj::Second(f) => f.poll_flush(cx), + FlattenProj::First { .. } => Poll::Ready(Ok(())), + FlattenProj::Second { f } => f.poll_flush(cx), FlattenProj::Empty => panic!("poll_flush called after eof"), } } @@ -142,11 +145,11 @@ where cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { let res = match self.as_mut().project() { - FlattenProj::Second(f) => f.poll_close(cx), + FlattenProj::Second { f } => f.poll_close(cx), _ => Poll::Ready(Ok(())), }; if res.is_ready() { - self.set(Flatten::Empty); + self.set(Self::Empty); } res } diff --git a/src/future/future/fuse.rs b/src/future/future/fuse.rs index 9f2e1ca..f4284ba 100644 --- a/src/future/future/fuse.rs +++ b/src/future/future/fuse.rs @@ -1,17 +1,22 @@ use core::pin::Pin; use futures_core::future::{Future, FusedFuture}; +use futures_core::ready; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`fuse`](super::FutureExt::fuse) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Fuse<Fut>(#[pin] Option<Fut>); +pin_project! { + /// Future for the [`fuse`](super::FutureExt::fuse) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Fuse<Fut> { + #[pin] + inner: Option<Fut>, + } +} impl<Fut> Fuse<Fut> { - pub(super) fn new(f: Fut) -> Fuse<Fut> { - Fuse(Some(f)) + pub(super) fn new(f: Fut) -> Self { + Self { inner: Some(f) } } } @@ -61,14 +66,14 @@ impl<Fut: Future> Fuse<Fut> { /// } /// # }); /// ``` - pub fn terminated() -> Fuse<Fut> { - Fuse(None) + pub fn terminated() -> Self { + Self { inner: None } } } impl<Fut: Future> FusedFuture for Fuse<Fut> { fn is_terminated(&self) -> bool { - self.0.is_none() + self.inner.is_none() } } @@ -76,10 +81,10 @@ impl<Fut: Future> Future for Fuse<Fut> { type Output = Fut::Output; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> { - Poll::Ready(match self.as_mut().project().0.as_pin_mut() { + Poll::Ready(match self.as_mut().project().inner.as_pin_mut() { Some(fut) => { let output = ready!(fut.poll(cx)); - self.project().0.set(None); + self.project().inner.set(None); output }, None => return Poll::Pending, diff --git a/src/future/future/map.rs b/src/future/future/map.rs index 8e7f636..7471aba 100644 --- a/src/future/future/map.rs +++ b/src/future/future/map.rs @@ -1,45 +1,51 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use crate::fns::FnOnce1; -/// Internal Map future -#[pin_project(project = MapProj, project_replace = MapProjOwn)] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub enum Map<Fut, F> { - Incomplete { - #[pin] - future: Fut, - f: F, - }, - Complete, +pin_project! { + /// Internal Map future + #[project = MapProj] + #[project_replace = MapProjReplace] + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub enum Map<Fut, F> { + Incomplete { + #[pin] + future: Fut, + f: F, + }, + Complete, + } } impl<Fut, F> Map<Fut, F> { /// Creates a new Map. - pub(crate) fn new(future: Fut, f: F) -> Map<Fut, F> { - Map::Incomplete { future, f } + pub(crate) fn new(future: Fut, f: F) -> Self { + Self::Incomplete { future, f } } } impl<Fut, F, T> FusedFuture for Map<Fut, F> - where Fut: Future, - F: FnOnce1<Fut::Output, Output=T>, +where + Fut: Future, + F: FnOnce1<Fut::Output, Output = T>, { fn is_terminated(&self) -> bool { match self { - Map::Incomplete { .. } => false, - Map::Complete => true, + Self::Incomplete { .. } => false, + Self::Complete => true, } } } impl<Fut, F, T> Future for Map<Fut, F> - where Fut: Future, - F: FnOnce1<Fut::Output, Output=T>, +where + Fut: Future, + F: FnOnce1<Fut::Output, Output = T>, { type Output = T; @@ -48,11 +54,13 @@ impl<Fut, F, T> Future for Map<Fut, F> MapProj::Incomplete { future, .. } => { let output = ready!(future.poll(cx)); match self.project_replace(Map::Complete) { - MapProjOwn::Incomplete { f, .. } => Poll::Ready(f.call_once(output)), - MapProjOwn::Complete => unreachable!(), + MapProjReplace::Incomplete { f, .. } => Poll::Ready(f.call_once(output)), + MapProjReplace::Complete => unreachable!(), } - }, - MapProj::Complete => panic!("Map must not be polled after it returned `Poll::Ready`"), + } + MapProj::Complete => { + panic!("Map must not be polled after it returned `Poll::Ready`") + } } } } diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs index f5d5dd2..f01d346 100644 --- a/src/future/future/mod.rs +++ b/src/future/future/mod.rs @@ -114,7 +114,7 @@ pub use self::remote_handle::{Remote, RemoteHandle}; mod shared; #[cfg(feature = "std")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 -pub use self::shared::Shared; +pub use self::shared::{Shared, WeakShared}; impl<T: ?Sized> FutureExt for T where T: Future {} diff --git a/src/future/future/remote_handle.rs b/src/future/future/remote_handle.rs index 598f63c..0d33ea5 100644 --- a/src/future/future/remote_handle.rs +++ b/src/future/future/remote_handle.rs @@ -4,6 +4,7 @@ use { futures_core::{ future::Future, task::{Context, Poll}, + ready, }, std::{ any::Any, @@ -16,7 +17,7 @@ use { }, thread, }, - pin_project::pin_project, + pin_project_lite::pin_project, }; /// The handle to a remote future returned by @@ -69,16 +70,17 @@ impl<T: 'static> Future for RemoteHandle<T> { type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'static)>>; -/// A future which sends its output to the corresponding `RemoteHandle`. -/// Created by [`remote_handle`](crate::future::FutureExt::remote_handle). -#[pin_project] -#[must_use = "futures do nothing unless you `.await` or poll them"] -#[cfg_attr(docsrs, doc(cfg(feature = "channel")))] -pub struct Remote<Fut: Future> { - tx: Option<Sender<SendMsg<Fut>>>, - keep_running: Arc<AtomicBool>, - #[pin] - future: CatchUnwind<AssertUnwindSafe<Fut>>, +pin_project! { + /// A future which sends its output to the corresponding `RemoteHandle`. + /// Created by [`remote_handle`](crate::future::FutureExt::remote_handle). + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] + pub struct Remote<Fut: Future> { + tx: Option<Sender<SendMsg<Fut>>>, + keep_running: Arc<AtomicBool>, + #[pin] + future: CatchUnwind<AssertUnwindSafe<Fut>>, + } } impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> { @@ -95,11 +97,11 @@ impl<Fut: Future> Future for Remote<Fut> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let this = self.project(); - if let Poll::Ready(_) = this.tx.as_mut().unwrap().poll_canceled(cx) { - if !this.keep_running.load(Ordering::SeqCst) { - // Cancelled, bail out - return Poll::Ready(()) - } + if this.tx.as_mut().unwrap().poll_canceled(cx).is_ready() + && !this.keep_running.load(Ordering::SeqCst) + { + // Cancelled, bail out + return Poll::Ready(()); } let output = ready!(this.future.poll(cx)); diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs index 79ad5c3..53635b5 100644 --- a/src/future/future/shared.rs +++ b/src/future/future/shared.rs @@ -7,7 +7,7 @@ use std::fmt; use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Acquire, SeqCst}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Weak}; /// Future for the [`shared`](super::FutureExt::shared) method. #[must_use = "futures do nothing unless you `.await` or poll them"] @@ -26,6 +26,9 @@ struct Notifier { wakers: Mutex<Option<Slab<Option<Waker>>>>, } +/// A weak reference to a [`Shared`] that can be upgraded much like an `Arc`. +pub struct WeakShared<Fut: Future>(Weak<Inner<Fut>>); + // The future itself is polled behind the `Arc`, so it won't be moved // when `Shared` is moved. impl<Fut: Future> Unpin for Shared<Fut> {} @@ -45,6 +48,12 @@ impl<Fut: Future> fmt::Debug for Inner<Fut> { } } +impl<Fut: Future> fmt::Debug for WeakShared<Fut> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WeakShared").finish() + } +} + enum FutureOrOutput<Fut: Future> { Future(Fut), Output(Fut::Output), @@ -72,7 +81,7 @@ const POISONED: usize = 3; const NULL_WAKER_KEY: usize = usize::max_value(); impl<Fut: Future> Shared<Fut> { - pub(super) fn new(future: Fut) -> Shared<Fut> { + pub(super) fn new(future: Fut) -> Self { let inner = Inner { future_or_output: UnsafeCell::new(FutureOrOutput::Future(future)), notifier: Arc::new(Notifier { @@ -81,7 +90,7 @@ impl<Fut: Future> Shared<Fut> { }), }; - Shared { + Self { inner: Some(Arc::new(inner)), waker_key: NULL_WAKER_KEY, } @@ -107,6 +116,16 @@ where } None } + + /// Creates a new [`WeakShared`] for this [`Shared`]. + /// + /// Returns [`None`] if it has already been polled to completion. + pub fn downgrade(&self) -> Option<WeakShared<Fut>> { + if let Some(inner) = self.inner.as_ref() { + return Some(WeakShared(Arc::downgrade(inner))); + } + None + } } impl<Fut> Inner<Fut> @@ -118,7 +137,7 @@ where /// is `COMPLETE` unsafe fn output(&self) -> &Fut::Output { match &*self.future_or_output.get() { - FutureOrOutput::Output(ref item) => &item, + FutureOrOutput::Output(ref item) => item, FutureOrOutput::Future(_) => unreachable!(), } } @@ -191,7 +210,12 @@ where inner.record_waker(&mut this.waker_key, cx); - match inner.notifier.state.compare_and_swap(IDLE, POLLING, SeqCst) { + match inner + .notifier + .state + .compare_exchange(IDLE, POLLING, SeqCst, SeqCst) + .unwrap_or_else(|x| x) + { IDLE => { // Lock acquired, fall through } @@ -236,14 +260,18 @@ where match future.poll(&mut cx) { Poll::Pending => { - match inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) { - POLLING => { - // Success - drop(_reset); - this.inner = Some(inner); - return Poll::Pending; - } - _ => unreachable!(), + if inner + .notifier + .state + .compare_exchange(POLLING, IDLE, SeqCst, SeqCst) + .is_ok() + { + // Success + drop(_reset); + this.inner = Some(inner); + return Poll::Pending; + } else { + unreachable!() } } Poll::Ready(output) => output, @@ -278,7 +306,7 @@ where Fut: Future, { fn clone(&self) -> Self { - Shared { + Self { inner: self.inner.clone(), waker_key: NULL_WAKER_KEY, } @@ -314,3 +342,17 @@ impl ArcWake for Notifier { } } } + +impl<Fut: Future> WeakShared<Fut> +{ + /// Attempts to upgrade this [`WeakShared`] into a [`Shared`]. + /// + /// Returns [`None`] if all clones of the [`Shared`] have been dropped or polled + /// to completion. + pub fn upgrade(&self) -> Option<Shared<Fut>> { + Some(Shared { + inner: Some(self.0.upgrade()?), + waker_key: NULL_WAKER_KEY, + }) + } +} diff --git a/src/future/join.rs b/src/future/join.rs index 363e119..cfe53a7 100644 --- a/src/future/join.rs +++ b/src/future/join.rs @@ -5,7 +5,7 @@ use core::fmt; use core::pin::Pin; use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use super::assert_future; @@ -14,11 +14,12 @@ macro_rules! generate { $(#[$doc:meta])* ($Join:ident, <$($Fut:ident),*>), )*) => ($( - $(#[$doc])* - #[pin_project] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct $Join<$($Fut: Future),*> { - $(#[pin] $Fut: MaybeDone<$Fut>,)* + pin_project! { + $(#[$doc])* + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct $Join<$($Fut: Future),*> { + $(#[pin] $Fut: MaybeDone<$Fut>,)* + } } impl<$($Fut),*> fmt::Debug for $Join<$($Fut),*> @@ -36,8 +37,8 @@ macro_rules! generate { } impl<$($Fut: Future),*> $Join<$($Fut),*> { - fn new($($Fut: $Fut),*) -> $Join<$($Fut),*> { - $Join { + fn new($($Fut: $Fut),*) -> Self { + Self { $($Fut: maybe_done($Fut)),* } } diff --git a/src/future/maybe_done.rs b/src/future/maybe_done.rs index 5120a9b..bb5579e 100644 --- a/src/future/maybe_done.rs +++ b/src/future/maybe_done.rs @@ -1,18 +1,18 @@ //! Definition of the MaybeDone combinator +use core::mem; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; /// A future that may have completed. /// /// This is created by the [`maybe_done()`] function. -#[pin_project(project = MaybeDoneProj, project_replace = MaybeDoneProjOwn)] #[derive(Debug)] pub enum MaybeDone<Fut: Future> { /// A not-yet-completed future - Future(#[pin] Fut), + Future(/* #[pin] */ Fut), /// The output of the completed future Done(Fut::Output), /// The empty variant after the result of a [`MaybeDone`] has been @@ -20,6 +20,8 @@ pub enum MaybeDone<Fut: Future> { Gone, } +impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {} + /// Wraps a future into a `MaybeDone` /// /// # Examples @@ -48,9 +50,11 @@ impl<Fut: Future> MaybeDone<Fut> { /// has not yet been called. #[inline] pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> { - match self.project() { - MaybeDoneProj::Done(res) => Some(res), - _ => None, + unsafe { + match self.get_unchecked_mut() { + MaybeDone::Done(res) => Some(res), + _ => None, + } } } @@ -59,12 +63,14 @@ impl<Fut: Future> MaybeDone<Fut> { #[inline] pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> { match &*self { - MaybeDone::Done(_) => {} - MaybeDone::Future(_) | MaybeDone::Gone => return None, + Self::Done(_) => {} + Self::Future(_) | Self::Gone => return None, } - match self.project_replace(MaybeDone::Gone) { - MaybeDoneProjOwn::Done(output) => Some(output), - _ => unreachable!(), + unsafe { + match mem::replace(self.get_unchecked_mut(), Self::Gone) { + MaybeDone::Done(output) => Some(output), + _ => unreachable!(), + } } } } @@ -72,8 +78,8 @@ impl<Fut: Future> MaybeDone<Fut> { impl<Fut: Future> FusedFuture for MaybeDone<Fut> { fn is_terminated(&self) -> bool { match self { - MaybeDone::Future(_) => false, - MaybeDone::Done(_) | MaybeDone::Gone => true, + Self::Future(_) => false, + Self::Done(_) | Self::Gone => true, } } } @@ -82,13 +88,15 @@ impl<Fut: Future> Future for MaybeDone<Fut> { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - match self.as_mut().project() { - MaybeDoneProj::Future(f) => { - let res = ready!(f.poll(cx)); - self.set(MaybeDone::Done(res)); + unsafe { + match self.as_mut().get_unchecked_mut() { + MaybeDone::Future(f) => { + let res = ready!(Pin::new_unchecked(f).poll(cx)); + self.set(Self::Done(res)); + } + MaybeDone::Done(_) => {} + MaybeDone::Gone => panic!("MaybeDone polled after value taken"), } - MaybeDoneProj::Done(_) => {} - MaybeDoneProj::Gone => panic!("MaybeDone polled after value taken"), } Poll::Ready(()) } diff --git a/src/future/mod.rs b/src/future/mod.rs index 3f19c19..ab29823 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -1,8 +1,13 @@ -//! Futures +//! Asynchronous values. //! -//! This module contains a number of functions for working with `Future`s, -//! including the [`FutureExt`] trait and the [`TryFutureExt`] trait which add -//! methods to `Future` types. +//! This module contains: +//! +//! - The [`Future`] trait. +//! - The [`FutureExt`] and [`TryFutureExt`] trait, which provides adapters for +//! chaining and composing futures. +//! - Top-level future combinators like [`lazy`](lazy()) which creates a future +//! from a closure that defines its return value, and [`ready`](ready()), +//! which constructs a future with an immediate defined value. #[cfg(feature = "alloc")] pub use futures_core::future::{BoxFuture, LocalBoxFuture}; @@ -28,7 +33,7 @@ pub use self::future::CatchUnwind; pub use self::future::{Remote, RemoteHandle}; #[cfg(feature = "std")] -pub use self::future::Shared; +pub use self::future::{Shared, WeakShared}; mod try_future; pub use self::try_future::{ diff --git a/src/future/option.rs b/src/future/option.rs index 88be009..85939d6 100644 --- a/src/future/option.rs +++ b/src/future/option.rs @@ -3,29 +3,33 @@ use core::pin::Pin; use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// A future representing a value which may or may not be present. -/// -/// Created by the [`From`] implementation for [`Option`](std::option::Option). -/// -/// # Examples -/// -/// ``` -/// # futures::executor::block_on(async { -/// use futures::future::OptionFuture; -/// -/// let mut a: OptionFuture<_> = Some(async { 123 }).into(); -/// assert_eq!(a.await, Some(123)); -/// -/// a = None.into(); -/// assert_eq!(a.await, None); -/// # }); -/// ``` -#[pin_project] -#[derive(Debug, Clone)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct OptionFuture<F>(#[pin] Option<F>); +pin_project! { + /// A future representing a value which may or may not be present. + /// + /// Created by the [`From`] implementation for [`Option`](std::option::Option). + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::future::OptionFuture; + /// + /// let mut a: OptionFuture<_> = Some(async { 123 }).into(); + /// assert_eq!(a.await, Some(123)); + /// + /// a = None.into(); + /// assert_eq!(a.await, None); + /// # }); + /// ``` + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct OptionFuture<F> { + #[pin] + inner: Option<F>, + } +} impl<F: Future> Future for OptionFuture<F> { type Output = Option<F::Output>; @@ -34,7 +38,7 @@ impl<F: Future> Future for OptionFuture<F> { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { - match self.project().0.as_pin_mut() { + match self.project().inner.as_pin_mut() { Some(x) => x.poll(cx).map(Some), None => Poll::Ready(None), } @@ -43,7 +47,7 @@ impl<F: Future> Future for OptionFuture<F> { impl<F: FusedFuture> FusedFuture for OptionFuture<F> { fn is_terminated(&self) -> bool { - match &self.0 { + match &self.inner { Some(x) => x.is_terminated(), None => true, } @@ -52,6 +56,6 @@ impl<F: FusedFuture> FusedFuture for OptionFuture<F> { impl<T> From<Option<T>> for OptionFuture<T> { fn from(option: Option<T>) -> Self { - OptionFuture(option) + Self { inner: option } } } diff --git a/src/future/try_future/into_future.rs b/src/future/try_future/into_future.rs index 240bb1b..e88d603 100644 --- a/src/future/try_future/into_future.rs +++ b/src/future/try_future/into_future.rs @@ -1,23 +1,27 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`into_future`](super::TryFutureExt::into_future) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct IntoFuture<Fut>(#[pin] Fut); +pin_project! { + /// Future for the [`into_future`](super::TryFutureExt::into_future) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct IntoFuture<Fut> { + #[pin] + future: Fut, + } +} impl<Fut> IntoFuture<Fut> { #[inline] - pub(crate) fn new(future: Fut) -> IntoFuture<Fut> { - IntoFuture(future) + pub(crate) fn new(future: Fut) -> Self { + Self { future } } } impl<Fut: TryFuture + FusedFuture> FusedFuture for IntoFuture<Fut> { - fn is_terminated(&self) -> bool { self.0.is_terminated() } + fn is_terminated(&self) -> bool { self.future.is_terminated() } } impl<Fut: TryFuture> Future for IntoFuture<Fut> { @@ -28,6 +32,6 @@ impl<Fut: TryFuture> Future for IntoFuture<Fut> { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { - self.project().0.try_poll(cx) + self.project().future.try_poll(cx) } } diff --git a/src/future/try_future/try_flatten.rs b/src/future/try_future/try_flatten.rs index 2bcadc5..5241b27 100644 --- a/src/future/try_future/try_flatten.rs +++ b/src/future/try_future/try_flatten.rs @@ -1,22 +1,25 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream, TryStream}; #[cfg(feature = "sink")] use futures_sink::Sink; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -#[pin_project(project = TryFlattenProj)] -#[derive(Debug)] -pub enum TryFlatten<Fut1, Fut2> { - First(#[pin] Fut1), - Second(#[pin] Fut2), - Empty, +pin_project! { + #[project = TryFlattenProj] + #[derive(Debug)] + pub enum TryFlatten<Fut1, Fut2> { + First { #[pin] f: Fut1 }, + Second { #[pin] f: Fut2 }, + Empty, + } } impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> { pub(crate) fn new(future: Fut1) -> Self { - TryFlatten::First(future) + Self::First { f: future } } } @@ -26,7 +29,7 @@ impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok> { fn is_terminated(&self) -> bool { match self { - TryFlatten::Empty => true, + Self::Empty => true, _ => false, } } @@ -41,18 +44,18 @@ impl<Fut> Future for TryFlatten<Fut, Fut::Ok> fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First(f) => { + TryFlattenProj::First { f } => { match ready!(f.try_poll(cx)) { - Ok(f) => self.set(TryFlatten::Second(f)), + Ok(f) => self.set(Self::Second { f }), Err(e) => { - self.set(TryFlatten::Empty); + self.set(Self::Empty); break Err(e); } } }, - TryFlattenProj::Second(f) => { + TryFlattenProj::Second { f } => { let output = ready!(f.try_poll(cx)); - self.set(TryFlatten::Empty); + self.set(Self::Empty); break output; }, TryFlattenProj::Empty => panic!("TryFlatten polled after completion"), @@ -67,7 +70,7 @@ impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok> { fn is_terminated(&self) -> bool { match self { - TryFlatten::Empty => true, + Self::Empty => true, _ => false, } } @@ -82,19 +85,19 @@ impl<Fut> Stream for TryFlatten<Fut, Fut::Ok> fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First(f) => { + TryFlattenProj::First { f } => { match ready!(f.try_poll(cx)) { - Ok(f) => self.set(TryFlatten::Second(f)), + Ok(f) => self.set(Self::Second { f }), Err(e) => { - self.set(TryFlatten::Empty); + self.set(Self::Empty); break Some(Err(e)); } } }, - TryFlattenProj::Second(f) => { + TryFlattenProj::Second { f } => { let output = ready!(f.try_poll_next(cx)); if output.is_none() { - self.set(TryFlatten::Empty); + self.set(Self::Empty); } break output; }, @@ -119,16 +122,16 @@ where ) -> Poll<Result<(), Self::Error>> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenProj::First(f) => { + TryFlattenProj::First { f } => { match ready!(f.try_poll(cx)) { - Ok(f) => self.set(TryFlatten::Second(f)), + Ok(f) => self.set(Self::Second { f }), Err(e) => { - self.set(TryFlatten::Empty); + self.set(Self::Empty); break Err(e); } } }, - TryFlattenProj::Second(f) => { + TryFlattenProj::Second { f } => { break ready!(f.poll_ready(cx)); }, TryFlattenProj::Empty => panic!("poll_ready called after eof"), @@ -138,16 +141,16 @@ where fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { match self.project() { - TryFlattenProj::First(_) => panic!("poll_ready not called first"), - TryFlattenProj::Second(f) => f.start_send(item), + TryFlattenProj::First { .. } => panic!("poll_ready not called first"), + TryFlattenProj::Second { f } => f.start_send(item), TryFlattenProj::Empty => panic!("start_send called after eof"), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match self.project() { - TryFlattenProj::First(_) => Poll::Ready(Ok(())), - TryFlattenProj::Second(f) => f.poll_flush(cx), + TryFlattenProj::First { .. } => Poll::Ready(Ok(())), + TryFlattenProj::Second { f } => f.poll_flush(cx), TryFlattenProj::Empty => panic!("poll_flush called after eof"), } } @@ -157,11 +160,11 @@ where cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { let res = match self.as_mut().project() { - TryFlattenProj::Second(f) => f.poll_close(cx), + TryFlattenProj::Second { f } => f.poll_close(cx), _ => Poll::Ready(Ok(())), }; if res.is_ready() { - self.set(TryFlatten::Empty); + self.set(Self::Empty); } res } diff --git a/src/future/try_future/try_flatten_err.rs b/src/future/try_future/try_flatten_err.rs index 480f8c3..2e67f11 100644 --- a/src/future/try_future/try_flatten_err.rs +++ b/src/future/try_future/try_flatten_err.rs @@ -1,19 +1,22 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::ready; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -#[pin_project(project = TryFlattenErrProj)] -#[derive(Debug)] -pub enum TryFlattenErr<Fut1, Fut2> { - First(#[pin] Fut1), - Second(#[pin] Fut2), - Empty, +pin_project! { + #[project = TryFlattenErrProj] + #[derive(Debug)] + pub enum TryFlattenErr<Fut1, Fut2> { + First { #[pin] f: Fut1 }, + Second { #[pin] f: Fut2 }, + Empty, + } } impl<Fut1, Fut2> TryFlattenErr<Fut1, Fut2> { pub(crate) fn new(future: Fut1) -> Self { - TryFlattenErr::First(future) + Self::First { f: future } } } @@ -23,7 +26,7 @@ impl<Fut> FusedFuture for TryFlattenErr<Fut, Fut::Error> { fn is_terminated(&self) -> bool { match self { - TryFlattenErr::Empty => true, + Self::Empty => true, _ => false, } } @@ -38,18 +41,18 @@ impl<Fut> Future for TryFlattenErr<Fut, Fut::Error> fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { Poll::Ready(loop { match self.as_mut().project() { - TryFlattenErrProj::First(f) => { + TryFlattenErrProj::First { f } => { match ready!(f.try_poll(cx)) { - Err(f) => self.set(TryFlattenErr::Second(f)), + Err(f) => self.set(Self::Second { f }), Ok(e) => { - self.set(TryFlattenErr::Empty); + self.set(Self::Empty); break Ok(e); } } }, - TryFlattenErrProj::Second(f) => { + TryFlattenErrProj::Second { f } => { let output = ready!(f.try_poll(cx)); - self.set(TryFlattenErr::Empty); + self.set(Self::Empty); break output; }, TryFlattenErrProj::Empty => panic!("TryFlattenErr polled after completion"), diff --git a/src/future/try_join.rs b/src/future/try_join.rs index b4a3b98..25ccdde 100644 --- a/src/future/try_join.rs +++ b/src/future/try_join.rs @@ -5,19 +5,20 @@ use core::fmt; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; macro_rules! generate { ($( $(#[$doc:meta])* ($Join:ident, <Fut1, $($Fut:ident),*>), )*) => ($( - $(#[$doc])* - #[pin_project] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct $Join<Fut1: TryFuture, $($Fut: TryFuture),*> { - #[pin] Fut1: TryMaybeDone<Fut1>, - $(#[pin] $Fut: TryMaybeDone<$Fut>,)* + pin_project! { + $(#[$doc])* + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct $Join<Fut1: TryFuture, $($Fut: TryFuture),*> { + #[pin] Fut1: TryMaybeDone<Fut1>, + $(#[pin] $Fut: TryMaybeDone<$Fut>,)* + } } impl<Fut1, $($Fut),*> fmt::Debug for $Join<Fut1, $($Fut),*> @@ -46,8 +47,8 @@ macro_rules! generate { $Fut: TryFuture<Error=Fut1::Error> ),* { - fn new(Fut1: Fut1, $($Fut: $Fut),*) -> $Join<Fut1, $($Fut),*> { - $Join { + fn new(Fut1: Fut1, $($Fut: $Fut),*) -> Self { + Self { Fut1: try_maybe_done(Fut1), $($Fut: try_maybe_done($Fut)),* } diff --git a/src/future/try_maybe_done.rs b/src/future/try_maybe_done.rs index b38b038..90067e9 100644 --- a/src/future/try_maybe_done.rs +++ b/src/future/try_maybe_done.rs @@ -1,18 +1,18 @@ //! Definition of the TryMaybeDone combinator +use core::mem; use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::ready; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; /// A future that may have completed with an error. /// /// This is created by the [`try_maybe_done()`] function. -#[pin_project(project = TryMaybeDoneProj, project_replace = TryMaybeDoneProjOwn)] #[derive(Debug)] pub enum TryMaybeDone<Fut: TryFuture> { /// A not-yet-completed future - Future(#[pin] Fut), + Future(/* #[pin] */ Fut), /// The output of the completed future Done(Fut::Ok), /// The empty variant after the result of a [`TryMaybeDone`] has been @@ -21,6 +21,8 @@ pub enum TryMaybeDone<Fut: TryFuture> { Gone, } +impl<Fut: TryFuture + Unpin> Unpin for TryMaybeDone<Fut> {} + /// Wraps a future into a `TryMaybeDone` pub fn try_maybe_done<Fut: TryFuture>(future: Fut) -> TryMaybeDone<Fut> { TryMaybeDone::Future(future) @@ -33,9 +35,11 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> { /// has not yet been called. #[inline] pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Ok> { - match self.project() { - TryMaybeDoneProj::Done(res) => Some(res), - _ => None, + unsafe { + match self.get_unchecked_mut() { + TryMaybeDone::Done(res) => Some(res), + _ => None, + } } } @@ -44,12 +48,14 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> { #[inline] pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Ok> { match &*self { - TryMaybeDone::Done(_) => {}, - TryMaybeDone::Future(_) | TryMaybeDone::Gone => return None, + Self::Done(_) => {}, + Self::Future(_) | Self::Gone => return None, } - match self.project_replace(TryMaybeDone::Gone) { - TryMaybeDoneProjOwn::Done(output) => Some(output), - _ => unreachable!() + unsafe { + match mem::replace(self.get_unchecked_mut(), Self::Gone) { + TryMaybeDone::Done(output) => Some(output), + _ => unreachable!() + } } } } @@ -57,8 +63,8 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> { impl<Fut: TryFuture> FusedFuture for TryMaybeDone<Fut> { fn is_terminated(&self) -> bool { match self { - TryMaybeDone::Future(_) => false, - TryMaybeDone::Done(_) | TryMaybeDone::Gone => true, + Self::Future(_) => false, + Self::Done(_) | Self::Gone => true, } } } @@ -67,18 +73,20 @@ impl<Fut: TryFuture> Future for TryMaybeDone<Fut> { type Output = Result<(), Fut::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - match self.as_mut().project() { - TryMaybeDoneProj::Future(f) => { - match ready!(f.try_poll(cx)) { - Ok(res) => self.set(TryMaybeDone::Done(res)), - Err(e) => { - self.set(TryMaybeDone::Gone); - return Poll::Ready(Err(e)); + unsafe { + match self.as_mut().get_unchecked_mut() { + TryMaybeDone::Future(f) => { + match ready!(Pin::new_unchecked(f).try_poll(cx)) { + Ok(res) => self.set(Self::Done(res)), + Err(e) => { + self.set(Self::Gone); + return Poll::Ready(Err(e)); + } } - } - }, - TryMaybeDoneProj::Done(_) => {}, - TryMaybeDoneProj::Gone => panic!("TryMaybeDone polled after value taken"), + }, + TryMaybeDone::Done(_) => {}, + TryMaybeDone::Gone => panic!("TryMaybeDone polled after value taken"), + } } Poll::Ready(Ok(())) } diff --git a/src/io/allow_std.rs b/src/io/allow_std.rs index 346e9ba..9aa8eb4 100644 --- a/src/io/allow_std.rs +++ b/src/io/allow_std.rs @@ -41,7 +41,7 @@ macro_rules! try_with_interrupt { impl<T> AllowStdIo<T> { /// Creates a new `AllowStdIo` from an existing IO object. pub fn new(io: T) -> Self { - AllowStdIo(io) + Self(io) } /// Returns a reference to the contained IO object. diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs index 2755667..270a086 100644 --- a/src/io/buf_reader.rs +++ b/src/io/buf_reader.rs @@ -1,39 +1,41 @@ +use futures_core::ready; use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::io::{self, Read}; use std::pin::Pin; use std::{cmp, fmt}; use super::DEFAULT_BUF_SIZE; -/// The `BufReader` struct adds buffering to any reader. -/// -/// It can be excessively inefficient to work directly with a [`AsyncRead`] -/// instance. A `BufReader` performs large, infrequent reads on the underlying -/// [`AsyncRead`] and maintains an in-memory buffer of the results. -/// -/// `BufReader` can improve the speed of programs that make *small* and -/// *repeated* read calls to the same file or network socket. It does not -/// help when reading very large amounts at once, or reading just one or a few -/// times. It also provides no advantage when reading from a source that is -/// already in memory, like a `Vec<u8>`. -/// -/// When the `BufReader` is dropped, the contents of its buffer will be -/// discarded. Creating multiple instances of a `BufReader` on the same -/// stream can cause data loss. -/// -/// [`AsyncRead`]: futures_io::AsyncRead -/// -// TODO: Examples -#[pin_project] -pub struct BufReader<R> { - #[pin] - inner: R, - buffer: Box<[u8]>, - pos: usize, - cap: usize, +pin_project! { + /// The `BufReader` struct adds buffering to any reader. + /// + /// It can be excessively inefficient to work directly with a [`AsyncRead`] + /// instance. A `BufReader` performs large, infrequent reads on the underlying + /// [`AsyncRead`] and maintains an in-memory buffer of the results. + /// + /// `BufReader` can improve the speed of programs that make *small* and + /// *repeated* read calls to the same file or network socket. It does not + /// help when reading very large amounts at once, or reading just one or a few + /// times. It also provides no advantage when reading from a source that is + /// already in memory, like a `Vec<u8>`. + /// + /// When the `BufReader` is dropped, the contents of its buffer will be + /// discarded. Creating multiple instances of a `BufReader` on the same + /// stream can cause data loss. + /// + /// [`AsyncRead`]: futures_io::AsyncRead + /// + // TODO: Examples + pub struct BufReader<R> { + #[pin] + inner: R, + buffer: Box<[u8]>, + pos: usize, + cap: usize, + } } impl<R: AsyncRead> BufReader<R> { diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index ed9196d..991a365 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -1,38 +1,40 @@ +use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, SeekFrom}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::fmt; use std::io::{self, Write}; use std::pin::Pin; use super::DEFAULT_BUF_SIZE; -/// Wraps a writer and buffers its output. -/// -/// It can be excessively inefficient to work directly with something that -/// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and -/// writes it to an underlying writer in large, infrequent batches. -/// -/// `BufWriter` can improve the speed of programs that make *small* and -/// *repeated* write calls to the same file or network socket. It does not -/// help when writing very large amounts at once, or writing just one or a few -/// times. It also provides no advantage when writing to a destination that is -/// in memory, like a `Vec<u8>`. -/// -/// When the `BufWriter` is dropped, the contents of its buffer will be -/// discarded. Creating multiple instances of a `BufWriter` on the same -/// stream can cause data loss. If you need to write out the contents of its -/// buffer, you must manually call flush before the writer is dropped. -/// -/// [`AsyncWrite`]: futures_io::AsyncWrite -/// [`flush`]: super::AsyncWriteExt::flush -/// -// TODO: Examples -#[pin_project] -pub struct BufWriter<W> { - #[pin] - inner: W, - buf: Vec<u8>, - written: usize, +pin_project! { + /// Wraps a writer and buffers its output. + /// + /// It can be excessively inefficient to work directly with something that + /// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and + /// writes it to an underlying writer in large, infrequent batches. + /// + /// `BufWriter` can improve the speed of programs that make *small* and + /// *repeated* write calls to the same file or network socket. It does not + /// help when writing very large amounts at once, or writing just one or a few + /// times. It also provides no advantage when writing to a destination that is + /// in memory, like a `Vec<u8>`. + /// + /// When the `BufWriter` is dropped, the contents of its buffer will be + /// discarded. Creating multiple instances of a `BufWriter` on the same + /// stream can cause data loss. If you need to write out the contents of its + /// buffer, you must manually call flush before the writer is dropped. + /// + /// [`AsyncWrite`]: futures_io::AsyncWrite + /// [`flush`]: super::AsyncWriteExt::flush + /// + // TODO: Examples + pub struct BufWriter<W> { + #[pin] + inner: W, + buf: Vec<u8>, + written: usize, + } } impl<W: AsyncWrite> BufWriter<W> { diff --git a/src/io/chain.rs b/src/io/chain.rs index 336307f..1b6a335 100644 --- a/src/io/chain.rs +++ b/src/io/chain.rs @@ -1,21 +1,23 @@ +use futures_core::ready; use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::fmt; use std::io; use std::pin::Pin; -/// Reader for the [`chain`](super::AsyncReadExt::chain) method. -#[pin_project] -#[must_use = "readers do nothing unless polled"] -pub struct Chain<T, U> { - #[pin] - first: T, - #[pin] - second: U, - done_first: bool, +pin_project! { + /// Reader for the [`chain`](super::AsyncReadExt::chain) method. + #[must_use = "readers do nothing unless polled"] + pub struct Chain<T, U> { + #[pin] + first: T, + #[pin] + second: U, + done_first: bool, + } } impl<T, U> Chain<T, U> diff --git a/src/io/close.rs b/src/io/close.rs index 4d56696..b944592 100644 --- a/src/io/close.rs +++ b/src/io/close.rs @@ -15,7 +15,7 @@ impl<W: ?Sized + Unpin> Unpin for Close<'_, W> {} impl<'a, W: AsyncWrite + ?Sized + Unpin> Close<'a, W> { pub(super) fn new(writer: &'a mut W) -> Self { - Close { writer } + Self { writer } } } diff --git a/src/io/copy.rs b/src/io/copy.rs index 491a680..bc59255 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -4,7 +4,7 @@ use futures_io::{AsyncRead, AsyncWrite}; use std::io; use std::pin::Pin; use super::{BufReader, copy_buf, CopyBuf}; -use pin_project::pin_project; +use pin_project_lite::pin_project; /// Creates a future which copies all the bytes from one object to another. /// @@ -41,13 +41,14 @@ where } } -/// Future for the [`copy()`] function. -#[pin_project] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Copy<'a, R, W: ?Sized> { - #[pin] - inner: CopyBuf<'a, BufReader<R>, W>, +pin_project! { + /// Future for the [`copy()`] function. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Copy<'a, R, W: ?Sized> { + #[pin] + inner: CopyBuf<'a, BufReader<R>, W>, + } } impl<R: AsyncRead, W: AsyncWrite + Unpin + ?Sized> Future for Copy<'_, R, W> { diff --git a/src/io/copy_buf.rs b/src/io/copy_buf.rs index f47144a..6adf594 100644 --- a/src/io/copy_buf.rs +++ b/src/io/copy_buf.rs @@ -1,9 +1,10 @@ use futures_core::future::Future; +use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncWrite}; use std::io; use std::pin::Pin; -use pin_project::pin_project; +use pin_project_lite::pin_project; /// Creates a future which copies all the bytes from one object to another. /// @@ -42,15 +43,16 @@ where } } -/// Future for the [`copy_buf()`] function. -#[pin_project] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct CopyBuf<'a, R, W: ?Sized> { - #[pin] - reader: R, - writer: &'a mut W, - amt: u64, +pin_project! { + /// Future for the [`copy_buf()`] function. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBuf<'a, R, W: ?Sized> { + #[pin] + reader: R, + writer: &'a mut W, + amt: u64, + } } impl<R, W> Future for CopyBuf<'_, R, W> diff --git a/src/io/cursor.rs b/src/io/cursor.rs index d135923..b11dbf5 100644 --- a/src/io/cursor.rs +++ b/src/io/cursor.rs @@ -42,8 +42,8 @@ impl<T> Cursor<T> { /// # fn force_inference(_: &Cursor<Vec<u8>>) {} /// # force_inference(&buff); /// ``` - pub fn new(inner: T) -> Cursor<T> { - Cursor { + pub fn new(inner: T) -> Self { + Self { inner: io::Cursor::new(inner), } } diff --git a/src/io/fill_buf.rs b/src/io/fill_buf.rs index 015547e..6fb3ec7 100644 --- a/src/io/fill_buf.rs +++ b/src/io/fill_buf.rs @@ -15,7 +15,7 @@ impl<R: ?Sized> Unpin for FillBuf<'_, R> {} impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> { pub(super) fn new(reader: &'a mut R) -> Self { - FillBuf { reader: Some(reader) } + Self { reader: Some(reader) } } } diff --git a/src/io/flush.rs b/src/io/flush.rs index 70b867a..ece0a7c 100644 --- a/src/io/flush.rs +++ b/src/io/flush.rs @@ -15,7 +15,7 @@ impl<W: ?Sized + Unpin> Unpin for Flush<'_, W> {} impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> { pub(super) fn new(writer: &'a mut W) -> Self { - Flush { writer } + Self { writer } } } diff --git a/src/io/into_sink.rs b/src/io/into_sink.rs index 082c581..885ba2f 100644 --- a/src/io/into_sink.rs +++ b/src/io/into_sink.rs @@ -1,9 +1,10 @@ +use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::AsyncWrite; use futures_sink::Sink; use std::io; use std::pin::Pin; -use pin_project::pin_project; +use pin_project_lite::pin_project; #[derive(Debug)] struct Block<Item> { @@ -11,22 +12,23 @@ struct Block<Item> { bytes: Item, } -/// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method. -#[pin_project] -#[must_use = "sinks do nothing unless polled"] -#[derive(Debug)] -#[cfg_attr(docsrs, doc(cfg(feature = "sink")))] -pub struct IntoSink<W, Item> { - #[pin] - writer: W, - /// An outstanding block for us to push into the underlying writer, along with an offset of how - /// far into this block we have written already. - buffer: Option<Block<Item>>, +pin_project! { + /// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method. + #[must_use = "sinks do nothing unless polled"] + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] + pub struct IntoSink<W, Item> { + #[pin] + writer: W, + // An outstanding block for us to push into the underlying writer, along with an offset of how + // far into this block we have written already. + buffer: Option<Block<Item>>, + } } impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> { pub(super) fn new(writer: W) -> Self { - IntoSink { writer, buffer: None } + Self { writer, buffer: None } } /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_ diff --git a/src/io/lines.rs b/src/io/lines.rs index 90b993d..6ae7392 100644 --- a/src/io/lines.rs +++ b/src/io/lines.rs @@ -1,3 +1,4 @@ +use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_io::AsyncBufRead; @@ -5,19 +6,19 @@ use std::io; use std::mem; use std::pin::Pin; use super::read_line::read_line_internal; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`lines`](super::AsyncBufReadExt::lines) method. - -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Lines<R> { - #[pin] - reader: R, - buf: String, - bytes: Vec<u8>, - read: usize, +pin_project! { + /// Stream for the [`lines`](super::AsyncBufReadExt::lines) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Lines<R> { + #[pin] + reader: R, + buf: String, + bytes: Vec<u8>, + read: usize, + } } impl<R: AsyncBufRead> Lines<R> { diff --git a/src/io/mod.rs b/src/io/mod.rs index 51ee995..a7e2add 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -1,12 +1,19 @@ -//! IO +//! Asynchronous I/O. //! -//! This module contains a number of functions for working with -//! `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and `AsyncBufRead` types, including -//! the `AsyncReadExt`, `AsyncWriteExt`, `AsyncSeekExt`, and `AsyncBufReadExt` -//! traits which add methods to the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, -//! and `AsyncBufRead` types. +//! This module is the asynchronous version of `std::io`. It defines four +//! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`], +//! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the +//! standard library. However, these traits integrate with the asynchronous +//! task system, so that if an I/O object isn't ready for reading (or writing), +//! the thread is not blocked, and instead the current task is queued to be +//! woken when I/O is ready. //! -//! This module is only available when the `io` and `std` features of this +//! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and +//! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators +//! for operating with asynchronous I/O objects, including ways to work with +//! them using futures, streams and sinks. +//! +//! This module is only available when the `std` feature of this //! library is activated, and it is activated by default. #[cfg(feature = "io-compat")] diff --git a/src/io/read.rs b/src/io/read.rs index ea25959..677ba81 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -16,7 +16,7 @@ impl<R: ?Sized + Unpin> Unpin for Read<'_, R> {} impl<'a, R: AsyncRead + ?Sized + Unpin> Read<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self { - Read { reader, buf } + Self { reader, buf } } } diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs index a2bbd40..f2e0440 100644 --- a/src/io/read_exact.rs +++ b/src/io/read_exact.rs @@ -1,4 +1,5 @@ use crate::io::AsyncRead; +use futures_core::ready; use futures_core::future::Future; use futures_core::task::{Context, Poll}; use std::io; @@ -17,7 +18,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadExact<'_, R> {} impl<'a, R: AsyncRead + ?Sized + Unpin> ReadExact<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self { - ReadExact { reader, buf } + Self { reader, buf } } } diff --git a/src/io/read_line.rs b/src/io/read_line.rs index 81d8415..d402c96 100644 --- a/src/io/read_line.rs +++ b/src/io/read_line.rs @@ -1,3 +1,4 @@ +use futures_core::ready; use futures_core::future::Future; use futures_core::task::{Context, Poll}; use futures_io::AsyncBufRead; @@ -38,7 +39,7 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>( read: &mut usize, ) -> Poll<io::Result<usize>> { let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read)); - if str::from_utf8(&bytes).is_err() { + if str::from_utf8(bytes).is_err() { Poll::Ready(ret.and_then(|_| { Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8")) })) diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs index 70b0578..7bd2c89 100644 --- a/src/io/read_to_end.rs +++ b/src/io/read_to_end.rs @@ -1,4 +1,5 @@ use futures_core::future::Future; +use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::AsyncRead; use std::io; @@ -27,11 +28,16 @@ impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToEnd<'a, R> { } } -struct Guard<'a> { buf: &'a mut Vec<u8>, len: usize } +struct Guard<'a> { + buf: &'a mut Vec<u8>, + len: usize, +} impl Drop for Guard<'_> { fn drop(&mut self) { - unsafe { self.buf.set_len(self.len); } + unsafe { + self.buf.set_len(self.len); + } } } @@ -50,8 +56,10 @@ pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>( buf: &mut Vec<u8>, start_len: usize, ) -> Poll<io::Result<usize>> { - let mut g = Guard { len: buf.len(), buf }; - let ret; + let mut g = Guard { + len: buf.len(), + buf, + }; loop { if g.len == g.buf.len() { unsafe { @@ -62,24 +70,24 @@ pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>( } } - match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { - Ok(0) => { - ret = Poll::Ready(Ok(g.len - start_len)); - break; - } - Ok(n) => g.len += n, - Err(e) => { - ret = Poll::Ready(Err(e)); - break; + let buf = &mut g.buf[g.len..]; + match ready!(rd.as_mut().poll_read(cx, buf)) { + Ok(0) => return Poll::Ready(Ok(g.len - start_len)), + Ok(n) => { + // We can't allow bogus values from read. If it is too large, the returned vec could have its length + // set past its capacity, or if it overflows the vec could be shortened which could create an invalid + // string if this is called via read_to_string. + assert!(n <= buf.len()); + g.len += n; } + Err(e) => return Poll::Ready(Err(e)), } } - - ret } impl<A> Future for ReadToEnd<'_, A> - where A: AsyncRead + ?Sized + Unpin, +where + A: AsyncRead + ?Sized + Unpin, { type Output = io::Result<usize>; diff --git a/src/io/read_to_string.rs b/src/io/read_to_string.rs index 113fe6a..9242654 100644 --- a/src/io/read_to_string.rs +++ b/src/io/read_to_string.rs @@ -1,4 +1,5 @@ use super::read_to_end::read_to_end_internal; +use futures_core::ready; use futures_core::future::Future; use futures_core::task::{Context, Poll}; use futures_io::AsyncRead; @@ -38,7 +39,7 @@ fn read_to_string_internal<R: AsyncRead + ?Sized>( start_len: usize, ) -> Poll<io::Result<usize>> { let ret = ready!(read_to_end_internal(reader, cx, bytes, start_len)); - if str::from_utf8(&bytes).is_err() { + if str::from_utf8(bytes).is_err() { Poll::Ready(ret.and_then(|_| { Err(io::Error::new( io::ErrorKind::InvalidData, diff --git a/src/io/read_until.rs b/src/io/read_until.rs index 95c47e0..72b59ea 100644 --- a/src/io/read_until.rs +++ b/src/io/read_until.rs @@ -1,4 +1,5 @@ use futures_core::future::Future; +use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::AsyncBufRead; use std::io; diff --git a/src/io/repeat.rs b/src/io/repeat.rs index 84abd7f..4cefcb2 100644 --- a/src/io/repeat.rs +++ b/src/io/repeat.rs @@ -1,3 +1,4 @@ +use futures_core::ready; use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] use futures_io::Initializer; diff --git a/src/io/split.rs b/src/io/split.rs index ccddd04..185c21c 100644 --- a/src/io/split.rs +++ b/src/io/split.rs @@ -1,4 +1,5 @@ use crate::lock::BiLock; +use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut}; use core::fmt; diff --git a/src/io/take.rs b/src/io/take.rs index 6179486..687a697 100644 --- a/src/io/take.rs +++ b/src/io/take.rs @@ -1,20 +1,22 @@ +use futures_core::ready; use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] use futures_io::Initializer; use futures_io::{AsyncRead, AsyncBufRead}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{cmp, io}; use std::pin::Pin; -/// Reader for the [`take`](super::AsyncReadExt::take) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "readers do nothing unless you `.await` or poll them"] -pub struct Take<R> { - #[pin] - inner: R, - // Add '_' to avoid conflicts with `limit` method. - limit_: u64, +pin_project! { + /// Reader for the [`take`](super::AsyncReadExt::take) method. + #[derive(Debug)] + #[must_use = "readers do nothing unless you `.await` or poll them"] + pub struct Take<R> { + #[pin] + inner: R, + // Add '_' to avoid conflicts with `limit` method. + limit_: u64, + } } impl<R: AsyncRead> Take<R> { diff --git a/src/io/write_all.rs b/src/io/write_all.rs index f9ffb49..b134bf1 100644 --- a/src/io/write_all.rs +++ b/src/io/write_all.rs @@ -1,4 +1,5 @@ use futures_core::future::Future; +use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_io::AsyncWrite; use std::io; @@ -17,7 +18,7 @@ impl<W: ?Sized + Unpin> Unpin for WriteAll<'_, W> {} impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAll<'a, W> { pub(super) fn new(writer: &'a mut W, buf: &'a [u8]) -> Self { - WriteAll { writer, buf } + Self { writer, buf } } } diff --git a/src/io/write_all_vectored.rs b/src/io/write_all_vectored.rs index ec28798..380604d 100644 --- a/src/io/write_all_vectored.rs +++ b/src/io/write_all_vectored.rs @@ -1,3 +1,4 @@ +use futures_core::ready; use futures_core::future::Future; use futures_core::task::{Context, Poll}; use futures_io::AsyncWrite; @@ -19,7 +20,7 @@ impl<W: ?Sized + Unpin> Unpin for WriteAllVectored<'_, W> {} impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> { pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self { - WriteAllVectored { writer, bufs: IoSlice::advance(bufs, 0) } + Self { writer, bufs: IoSlice::advance(bufs, 0) } } } @@ -186,7 +187,7 @@ mod tests { (vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2]), IoSlice::new(&[3, 3, 3])], &[1, 1, 1, 2, 2, 2, 3, 3, 3]), ]; - for (mut input, wanted) in tests.into_iter() { + for (mut input, wanted) in tests { let mut dst = test_writer(2, 2); { let mut future = dst.write_all_vectored(&mut *input); @@ -4,22 +4,17 @@ #![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))] #![cfg_attr(feature = "read-initializer", feature(read_initializer))] #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))] - #![cfg_attr(not(feature = "std"), no_std)] -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] +#![warn( + missing_docs, + missing_debug_implementations, + rust_2018_idioms, + unreachable_pub +)] // It cannot be included in the published code because this lints have false positives in the minimum required version. #![cfg_attr(test, warn(single_use_lifetimes))] #![warn(clippy::all)] - -// mem::take requires Rust 1.40, matches! requires Rust 1.42 -// Can be removed if the minimum supported version increased or if https://github.com/rust-lang/rust-clippy/issues/3941 -// get's implemented. -#![allow(clippy::mem_replace_with_default, clippy::match_like_matches_macro)] - #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] - -#![doc(html_root_url = "https://docs.rs/futures-util/0.3.7")] - #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] @@ -34,9 +29,6 @@ compile_error!("The `read-initializer` feature requires the `unstable` feature a #[cfg(feature = "alloc")] extern crate alloc; -#[macro_use(ready)] -extern crate futures_core; - // Macro re-exports pub use futures_core::ready; pub use pin_utils::pin_mut; @@ -54,7 +46,7 @@ pub use self::async_await::*; pub mod __private { pub use crate::*; pub use core::{ - option::Option::{self, Some, None}, + option::Option::{self, None, Some}, pin::Pin, result::Result::{Err, Ok}, }; @@ -81,10 +73,7 @@ macro_rules! delegate_sink { self.project().$field.poll_ready(cx) } - fn start_send( - self: core::pin::Pin<&mut Self>, - item: $item, - ) -> Result<(), Self::Error> { + fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> { self.project().$field.start_send(item) } @@ -101,7 +90,7 @@ macro_rules! delegate_sink { ) -> core::task::Poll<Result<(), Self::Error>> { self.project().$field.poll_close(cx) } - } + }; } macro_rules! delegate_future { @@ -112,7 +101,7 @@ macro_rules! delegate_future { ) -> core::task::Poll<Self::Output> { self.project().$field.poll(cx) } - } + }; } macro_rules! delegate_stream { @@ -126,34 +115,40 @@ macro_rules! delegate_stream { fn size_hint(&self) -> (usize, Option<usize>) { self.$field.size_hint() } - } + }; } #[cfg(feature = "io")] #[cfg(feature = "std")] macro_rules! delegate_async_write { ($field:ident) => { - fn poll_write(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, buf: &[u8]) - -> core::task::Poll<std::io::Result<usize>> - { + fn poll_write( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + buf: &[u8], + ) -> core::task::Poll<std::io::Result<usize>> { self.project().$field.poll_write(cx, buf) } - fn poll_write_vectored(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, bufs: &[std::io::IoSlice<'_>]) - -> core::task::Poll<std::io::Result<usize>> - { + fn poll_write_vectored( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> core::task::Poll<std::io::Result<usize>> { self.project().$field.poll_write_vectored(cx, bufs) } - fn poll_flush(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) - -> core::task::Poll<std::io::Result<()>> - { + fn poll_flush( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll<std::io::Result<()>> { self.project().$field.poll_flush(cx) } - fn poll_close(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) - -> core::task::Poll<std::io::Result<()>> - { + fn poll_close( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll<std::io::Result<()>> { self.project().$field.poll_close(cx) } - } + }; } #[cfg(feature = "io")] @@ -165,18 +160,22 @@ macro_rules! delegate_async_read { self.$field.initializer() } - fn poll_read(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, buf: &mut [u8]) - -> core::task::Poll<std::io::Result<usize>> - { + fn poll_read( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + buf: &mut [u8], + ) -> core::task::Poll<std::io::Result<usize>> { self.project().$field.poll_read(cx, buf) } - fn poll_read_vectored(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, bufs: &mut [std::io::IoSliceMut<'_>]) - -> core::task::Poll<std::io::Result<usize>> - { + fn poll_read_vectored( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + bufs: &mut [std::io::IoSliceMut<'_>], + ) -> core::task::Poll<std::io::Result<usize>> { self.project().$field.poll_read_vectored(cx, bufs) } - } + }; } #[cfg(feature = "io")] @@ -193,7 +192,7 @@ macro_rules! delegate_async_buf_read { fn consume(self: core::pin::Pin<&mut Self>, amt: usize) { self.project().$field.consume(amt) } - } + }; } macro_rules! delegate_access_inner { @@ -289,10 +288,11 @@ macro_rules! delegate_all { } }; ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => { - #[pin_project::pin_project] - #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] - $(#[$attr])* - pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner:$t } + pin_project_lite::pin_project! { + #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"] + $(#[$attr])* + pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t } + } impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* { $($($item)*)* @@ -308,16 +308,19 @@ macro_rules! delegate_all { } pub mod future; -#[doc(hidden)] pub use crate::future::{FutureExt, TryFutureExt}; +#[doc(hidden)] +pub use crate::future::{FutureExt, TryFutureExt}; pub mod stream; -#[doc(hidden)] pub use crate::stream::{StreamExt, TryStreamExt}; +#[doc(hidden)] +pub use crate::stream::{StreamExt, TryStreamExt}; #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] pub mod sink; #[cfg(feature = "sink")] -#[doc(hidden)] pub use crate::sink::SinkExt; +#[doc(hidden)] +pub use crate::sink::SinkExt; pub mod task; @@ -333,12 +336,11 @@ pub mod compat; pub mod io; #[cfg(feature = "io")] #[cfg(feature = "std")] -#[doc(hidden)] pub use crate::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt}; - -mod fns; +#[doc(hidden)] +pub use crate::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +#[cfg(feature = "alloc")] +pub mod lock; -cfg_target_has_atomic! { - #[cfg(feature = "alloc")] - pub mod lock; -} +mod fns; +mod unfold_state; diff --git a/src/lock/bilock.rs b/src/lock/bilock.rs index 3698406..600e16e 100644 --- a/src/lock/bilock.rs +++ b/src/lock/bilock.rs @@ -60,13 +60,13 @@ impl<T> BiLock<T> { /// will only be available through `Pin<&mut T>` (not `&mut T`) unless `T` is `Unpin`. /// Similarly, reuniting the lock and extracting the inner value is only /// possible when `T` is `Unpin`. - pub fn new(t: T) -> (BiLock<T>, BiLock<T>) { + pub fn new(t: T) -> (Self, Self) { let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)), }); - (BiLock { arc: arc.clone() }, BiLock { arc }) + (Self { arc: arc.clone() }, Self { arc }) } /// Attempt to acquire this lock, returning `Pending` if it can't be diff --git a/src/lock/mod.rs b/src/lock/mod.rs index b252613..071eef6 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -3,18 +3,20 @@ //! This module is only available when the `std` or `alloc` feature of this //! library is activated, and it is activated by default. -#[cfg(feature = "std")] -mod mutex; -#[cfg(feature = "std")] -pub use self::mutex::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard}; +cfg_target_has_atomic! { + #[cfg(feature = "std")] + mod mutex; + #[cfg(feature = "std")] + pub use self::mutex::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard}; -#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] -#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] -#[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] -mod bilock; -#[cfg(feature = "bilock")] -#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] -pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; -#[cfg(any(feature = "sink", feature = "io"))] -#[cfg(not(feature = "bilock"))] -pub(crate) use self::bilock::BiLock; + #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] + #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] + #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] + mod bilock; + #[cfg(feature = "bilock")] + #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] + pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; + #[cfg(any(feature = "sink", feature = "io"))] + #[cfg(not(feature = "bilock"))] + pub(crate) use self::bilock::BiLock; +} diff --git a/src/lock/mutex.rs b/src/lock/mutex.rs index 84aeeda..a78de62 100644 --- a/src/lock/mutex.rs +++ b/src/lock/mutex.rs @@ -40,8 +40,8 @@ impl<T> From<T> for Mutex<T> { } impl<T: Default> Default for Mutex<T> { - fn default() -> Mutex<T> { - Mutex::new(Default::default()) + fn default() -> Self { + Self::new(Default::default()) } } @@ -53,15 +53,15 @@ enum Waiter { impl Waiter { fn register(&mut self, waker: &Waker) { match self { - Waiter::Waiting(w) if waker.will_wake(w) => {}, - _ => *self = Waiter::Waiting(waker.clone()), + Self::Waiting(w) if waker.will_wake(w) => {}, + _ => *self = Self::Waiting(waker.clone()), } } fn wake(&mut self) { - match mem::replace(self, Waiter::Woken) { - Waiter::Waiting(waker) => waker.wake(), - Waiter::Woken => {}, + match mem::replace(self, Self::Woken) { + Self::Waiting(waker) => waker.wake(), + Self::Woken => {}, } } } @@ -72,8 +72,8 @@ const HAS_WAITERS: usize = 1 << 1; impl<T> Mutex<T> { /// Creates a new futures-aware mutex. - pub fn new(t: T) -> Mutex<T> { - Mutex { + pub fn new(t: T) -> Self { + Self { state: AtomicUsize::new(0), waiters: StdMutex::new(Slab::new()), value: UnsafeCell::new(t), diff --git a/src/never.rs b/src/never.rs index 767c5af..e811f97 100644 --- a/src/never.rs +++ b/src/never.rs @@ -1,5 +1,6 @@ -//! Definition of the `Never` type, -//! a stand-in for the `!` type until it becomes stable. +//! This module contains the `Never` type. +//! +//! Values of this type can never be created and will never exist. /// A type with no possible values. /// diff --git a/src/sink/buffer.rs b/src/sink/buffer.rs index 8176abd..8c58f4f 100644 --- a/src/sink/buffer.rs +++ b/src/sink/buffer.rs @@ -1,26 +1,28 @@ +use futures_core::ready; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; use core::pin::Pin; use alloc::collections::VecDeque; -/// Sink for the [`buffer`](super::SinkExt::buffer) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "sinks do nothing unless polled"] -pub struct Buffer<Si, Item> { - #[pin] - sink: Si, - buf: VecDeque<Item>, - - // Track capacity separately from the `VecDeque`, which may be rounded up - capacity: usize, +pin_project! { + /// Sink for the [`buffer`](super::SinkExt::buffer) method. + #[derive(Debug)] + #[must_use = "sinks do nothing unless polled"] + pub struct Buffer<Si, Item> { + #[pin] + sink: Si, + buf: VecDeque<Item>, + + // Track capacity separately from the `VecDeque`, which may be rounded up + capacity: usize, + } } impl<Si: Sink<Item>, Item> Buffer<Si, Item> { pub(super) fn new(sink: Si, capacity: usize) -> Self { - Buffer { + Self { sink, buf: VecDeque::with_capacity(capacity), capacity, diff --git a/src/sink/close.rs b/src/sink/close.rs index 1514b41..4421d10 100644 --- a/src/sink/close.rs +++ b/src/sink/close.rs @@ -19,7 +19,7 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Close<'_, Si, Item> {} /// The sink itself is returned after closeing is complete. impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Close<'a, Si, Item> { pub(super) fn new(sink: &'a mut Si) -> Self { - Close { + Self { sink, _phantom: PhantomData, } diff --git a/src/sink/err_into.rs b/src/sink/err_into.rs index b23ada7..3eb9940 100644 --- a/src/sink/err_into.rs +++ b/src/sink/err_into.rs @@ -1,15 +1,16 @@ use crate::sink::{SinkExt, SinkMapErr}; use futures_core::stream::{Stream, FusedStream}; use futures_sink::{Sink}; -use pin_project::pin_project; - -/// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "sinks do nothing unless polled"] -pub struct SinkErrInto<Si: Sink<Item>, Item, E> { - #[pin] - sink: SinkMapErr<Si, fn(Si::Error) -> E>, +use pin_project_lite::pin_project; + +pin_project! { + /// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method. + #[derive(Debug)] + #[must_use = "sinks do nothing unless polled"] + pub struct SinkErrInto<Si: Sink<Item>, Item, E> { + #[pin] + sink: SinkMapErr<Si, fn(Si::Error) -> E>, + } } impl<Si, E, Item> SinkErrInto<Si, Item, E> @@ -17,7 +18,7 @@ impl<Si, E, Item> SinkErrInto<Si, Item, E> Si::Error: Into<E>, { pub(super) fn new(sink: Si) -> Self { - SinkErrInto { + Self { sink: SinkExt::sink_map_err(sink, Into::into), } } diff --git a/src/sink/fanout.rs b/src/sink/fanout.rs index d71d793..f351e86 100644 --- a/src/sink/fanout.rs +++ b/src/sink/fanout.rs @@ -2,24 +2,25 @@ use core::fmt::{Debug, Formatter, Result as FmtResult}; use core::pin::Pin; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_project::pin_project; - -/// Sink that clones incoming items and forwards them to two sinks at the same time. -/// -/// Backpressure from any downstream sink propagates up, which means that this sink -/// can only process items as fast as its _slowest_ downstream sink. -#[pin_project] -#[must_use = "sinks do nothing unless polled"] -pub struct Fanout<Si1, Si2> { - #[pin] - sink1: Si1, - #[pin] - sink2: Si2 +use pin_project_lite::pin_project; + +pin_project! { + /// Sink that clones incoming items and forwards them to two sinks at the same time. + /// + /// Backpressure from any downstream sink propagates up, which means that this sink + /// can only process items as fast as its _slowest_ downstream sink. + #[must_use = "sinks do nothing unless polled"] + pub struct Fanout<Si1, Si2> { + #[pin] + sink1: Si1, + #[pin] + sink2: Si2 + } } impl<Si1, Si2> Fanout<Si1, Si2> { - pub(super) fn new(sink1: Si1, sink2: Si2) -> Fanout<Si1, Si2> { - Fanout { sink1, sink2 } + pub(super) fn new(sink1: Si1, sink2: Si2) -> Self { + Self { sink1, sink2 } } /// Get a shared reference to the inner sinks. diff --git a/src/sink/feed.rs b/src/sink/feed.rs new file mode 100644 index 0000000..06df9a9 --- /dev/null +++ b/src/sink/feed.rs @@ -0,0 +1,49 @@ +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; + +/// Future for the [`feed`](super::SinkExt::feed) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Feed<'a, Si: ?Sized, Item> { + sink: &'a mut Si, + item: Option<Item>, +} + +// Pinning is never projected to children +impl<Si: Unpin + ?Sized, Item> Unpin for Feed<'_, Si, Item> {} + +impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Feed<'a, Si, Item> { + pub(super) fn new(sink: &'a mut Si, item: Item) -> Self { + Feed { + sink, + item: Some(item), + } + } + + pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> { + Pin::new(self.sink) + } + + pub(super) fn is_item_pending(&self) -> bool { + self.item.is_some() + } +} + +impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> { + type Output = Result<(), Si::Error>; + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + let this = self.get_mut(); + let mut sink = Pin::new(&mut this.sink); + ready!(sink.as_mut().poll_ready(cx))?; + let item = this.item.take().expect("polled Feed after completion"); + sink.as_mut().start_send(item)?; + Poll::Ready(Ok(())) + } +} diff --git a/src/sink/flush.rs b/src/sink/flush.rs index 86fecc4..c06a221 100644 --- a/src/sink/flush.rs +++ b/src/sink/flush.rs @@ -23,7 +23,7 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Flush<'_, Si, Item> {} /// all current requests are processed. impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Flush<'a, Si, Item> { pub(super) fn new(sink: &'a mut Si) -> Self { - Flush { + Self { sink, _phantom: PhantomData, } diff --git a/src/sink/map_err.rs b/src/sink/map_err.rs index 29994a7..2829344 100644 --- a/src/sink/map_err.rs +++ b/src/sink/map_err.rs @@ -2,21 +2,22 @@ use core::pin::Pin; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::{Sink}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method. -#[pin_project] -#[derive(Debug, Clone)] -#[must_use = "sinks do nothing unless polled"] -pub struct SinkMapErr<Si, F> { - #[pin] - sink: Si, - f: Option<F>, +pin_project! { + /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method. + #[derive(Debug, Clone)] + #[must_use = "sinks do nothing unless polled"] + pub struct SinkMapErr<Si, F> { + #[pin] + sink: Si, + f: Option<F>, + } } impl<Si, F> SinkMapErr<Si, F> { - pub(super) fn new(sink: Si, f: F) -> SinkMapErr<Si, F> { - SinkMapErr { sink, f: Some(f) } + pub(super) fn new(sink: Si, f: F) -> Self { + Self { sink, f: Some(f) } } delegate_access_inner!(sink, Si, ()); diff --git a/src/sink/mod.rs b/src/sink/mod.rs index b0e2c83..1a062d0 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -1,16 +1,16 @@ -//! Sinks +//! Asynchronous sinks. //! -//! This module contains a number of functions for working with `Sink`s, -//! including the `SinkExt` trait which adds methods to `Sink` types. +//! This module contains: //! -//! This module is only available when the `sink` feature of this -//! library is activated, and it is activated by default. +//! - The [`Sink`] trait, which allows you to asynchronously write data. +//! - The [`SinkExt`] trait, which provides adapters for chaining and composing +//! sinks. +use crate::future::Either; use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; -use crate::future::Either; #[cfg(feature = "compat")] use crate::compat::CompatSink; @@ -26,6 +26,9 @@ pub use self::drain::{drain, Drain}; mod fanout; pub use self::fanout::Fanout; +mod feed; +pub use self::feed::Feed; + mod flush; pub use self::flush::Flush; @@ -41,6 +44,9 @@ pub use self::send::Send; mod send_all; pub use self::send_all::SendAll; +mod unfold; +pub use self::unfold::{unfold, Unfold}; + mod with; pub use self::with::With; @@ -69,10 +75,11 @@ pub trait SinkExt<Item>: Sink<Item> { /// Note that this function consumes the given sink, returning a wrapped /// version, much like `Iterator::map`. fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F> - where F: FnMut(U) -> Fut, - Fut: Future<Output = Result<Item, E>>, - E: From<Self::Error>, - Self: Sized + where + F: FnMut(U) -> Fut, + Fut: Future<Output = Result<Item, E>>, + E: From<Self::Error>, + Self: Sized, { With::new(self, f) } @@ -110,9 +117,10 @@ pub trait SinkExt<Item>: Sink<Item> { /// # }); /// ``` fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F> - where F: FnMut(U) -> St, - St: Stream<Item = Result<Item, Self::Error>>, - Self: Sized + where + F: FnMut(U) -> St, + St: Stream<Item = Result<Item, Self::Error>>, + Self: Sized, { WithFlatMap::new(self, f) } @@ -133,8 +141,9 @@ pub trait SinkExt<Item>: Sink<Item> { /// Transforms the error returned by the sink. fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F> - where F: FnOnce(Self::Error) -> E, - Self: Sized, + where + F: FnOnce(Self::Error) -> E, + Self: Sized, { SinkMapErr::new(self, f) } @@ -143,13 +152,13 @@ pub trait SinkExt<Item>: Sink<Item> { /// /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`. fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E> - where Self: Sized, - Self::Error: Into<E>, + where + Self: Sized, + Self::Error: Into<E>, { SinkErrInto::new(self) } - /// Adds a fixed-size buffer to the current sink. /// /// The resulting sink will buffer up to `capacity` items when the @@ -164,14 +173,16 @@ pub trait SinkExt<Item>: Sink<Item> { /// library is activated, and it is activated by default. #[cfg(feature = "alloc")] fn buffer(self, capacity: usize) -> Buffer<Self, Item> - where Self: Sized, + where + Self: Sized, { Buffer::new(self, capacity) } /// Close the sink. fn close(&mut self) -> Close<'_, Self, Item> - where Self: Unpin, + where + Self: Unpin, { Close::new(self) } @@ -181,9 +192,10 @@ pub trait SinkExt<Item>: Sink<Item> { /// This adapter clones each incoming item and forwards it to both this as well as /// the other sink at the same time. fn fanout<Si>(self, other: Si) -> Fanout<Self, Si> - where Self: Sized, - Item: Clone, - Si: Sink<Item, Error=Self::Error> + where + Self: Sized, + Item: Clone, + Si: Sink<Item, Error = Self::Error>, { Fanout::new(self, other) } @@ -193,7 +205,8 @@ pub trait SinkExt<Item>: Sink<Item> { /// This adapter is intended to be used when you want to stop sending to the sink /// until all current requests are processed. fn flush(&mut self) -> Flush<'_, Self, Item> - where Self: Unpin, + where + Self: Unpin, { Flush::new(self) } @@ -202,14 +215,27 @@ pub trait SinkExt<Item>: Sink<Item> { /// into the sink, including flushing. /// /// Note that, **because of the flushing requirement, it is usually better - /// to batch together items to send via `send_all`, rather than flushing - /// between each item.** + /// to batch together items to send via `feed` or `send_all`, + /// rather than flushing between each item.** fn send(&mut self, item: Item) -> Send<'_, Self, Item> - where Self: Unpin, + where + Self: Unpin, { Send::new(self, item) } + /// A future that completes after the given item has been received + /// by the sink. + /// + /// Unlike `send`, the returned future does not flush the sink. + /// It is the caller's responsibility to ensure all pending items + /// are processed, which can be done via `flush` or `close`. + fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> + where Self: Unpin, + { + Feed::new(self, item) + } + /// A future that completes after the given stream has been fully processed /// into the sink, including flushing. /// @@ -221,12 +247,10 @@ pub trait SinkExt<Item>: Sink<Item> { /// Doing `sink.send_all(stream)` is roughly equivalent to /// `stream.forward(sink)`. The returned future will exhaust all items from /// `stream` and send them to `self`. - fn send_all<'a, St>( - &'a mut self, - stream: &'a mut St - ) -> SendAll<'a, Self, St> - where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized, - Self: Unpin, + fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St> + where + St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized, + Self: Unpin, { SendAll::new(self, stream) } @@ -237,8 +261,9 @@ pub trait SinkExt<Item>: Sink<Item> { /// This can be used in combination with the `right_sink` method to write `if` /// statements that evaluate to different streams in different branches. fn left_sink<Si2>(self) -> Either<Self, Si2> - where Si2: Sink<Item, Error = Self::Error>, - Self: Sized + where + Si2: Sink<Item, Error = Self::Error>, + Self: Sized, { Either::Left(self) } @@ -249,8 +274,9 @@ pub trait SinkExt<Item>: Sink<Item> { /// This can be used in combination with the `left_sink` method to write `if` /// statements that evaluate to different streams in different branches. fn right_sink<Si1>(self) -> Either<Si1, Self> - where Si1: Sink<Item, Error = Self::Error>, - Self: Sized + where + Si1: Sink<Item, Error = Self::Error>, + Self: Sized, { Either::Right(self) } @@ -260,15 +286,17 @@ pub trait SinkExt<Item>: Sink<Item> { #[cfg(feature = "compat")] #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] fn compat(self) -> CompatSink<Self, Item> - where Self: Sized + Unpin, + where + Self: Sized + Unpin, { CompatSink::new(self) } - + /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`] /// sink types. fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> - where Self: Unpin + where + Self: Unpin, { Pin::new(self).poll_ready(cx) } @@ -276,7 +304,8 @@ pub trait SinkExt<Item>: Sink<Item> { /// A convenience method for calling [`Sink::start_send`] on [`Unpin`] /// sink types. fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error> - where Self: Unpin + where + Self: Unpin, { Pin::new(self).start_send(item) } @@ -284,7 +313,8 @@ pub trait SinkExt<Item>: Sink<Item> { /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`] /// sink types. fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> - where Self: Unpin + where + Self: Unpin, { Pin::new(self).poll_flush(cx) } @@ -292,7 +322,8 @@ pub trait SinkExt<Item>: Sink<Item> { /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`] /// sink types. fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> - where Self: Unpin + where + Self: Unpin, { Pin::new(self).poll_close(cx) } diff --git a/src/sink/send.rs b/src/sink/send.rs index dc7f0be..384c22c 100644 --- a/src/sink/send.rs +++ b/src/sink/send.rs @@ -1,5 +1,7 @@ +use super::Feed; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::task::{Context, Poll}; use futures_sink::Sink; @@ -7,8 +9,7 @@ use futures_sink::Sink; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Send<'a, Si: ?Sized, Item> { - sink: &'a mut Si, - item: Option<Item>, + feed: Feed<'a, Si, Item>, } // Pinning is never projected to children @@ -16,9 +17,8 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Send<'_, Si, Item> {} impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Send<'a, Si, Item> { pub(super) fn new(sink: &'a mut Si, item: Item) -> Self { - Send { - sink, - item: Some(item), + Self { + feed: Feed::new(sink, item), } } } @@ -31,20 +31,15 @@ impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> { cx: &mut Context<'_>, ) -> Poll<Self::Output> { let this = &mut *self; - if let Some(item) = this.item.take() { - let mut sink = Pin::new(&mut this.sink); - match sink.as_mut().poll_ready(cx)? { - Poll::Ready(()) => sink.as_mut().start_send(item)?, - Poll::Pending => { - this.item = Some(item); - return Poll::Pending; - } - } + + if this.feed.is_item_pending() { + ready!(Pin::new(&mut this.feed).poll(cx))?; + debug_assert!(!this.feed.is_item_pending()); } // we're done sending the item, but want to block on flushing the // sink - ready!(Pin::new(&mut this.sink).poll_flush(cx))?; + ready!(this.feed.sink_pin_mut().poll_flush(cx))?; Poll::Ready(Ok(())) } diff --git a/src/sink/send_all.rs b/src/sink/send_all.rs index 255df4d..6a33459 100644 --- a/src/sink/send_all.rs +++ b/src/sink/send_all.rs @@ -2,6 +2,7 @@ use crate::stream::{StreamExt, TryStreamExt, Fuse}; use core::fmt; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{TryStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; @@ -49,8 +50,8 @@ where pub(super) fn new( sink: &'a mut Si, stream: &'a mut St, - ) -> SendAll<'a, Si, St> { - SendAll { + ) -> Self { + Self { sink, stream: stream.fuse(), buffered: None, diff --git a/src/sink/unfold.rs b/src/sink/unfold.rs new file mode 100644 index 0000000..1aab200 --- /dev/null +++ b/src/sink/unfold.rs @@ -0,0 +1,88 @@ +use crate::unfold_state::UnfoldState; +use core::{future::Future, pin::Pin}; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Sink for the [`unfold`] function. + #[derive(Debug)] + #[must_use = "sinks do nothing unless polled"] + pub struct Unfold<T, F, R> { + function: F, + #[pin] + state: UnfoldState<T, R>, + } +} + +/// Create a sink from a function which processes one item at a time. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::sink::{self, SinkExt}; +/// +/// let unfold = sink::unfold(0, |mut sum, i: i32| { +/// async move { +/// sum += i; +/// eprintln!("{}", i); +/// Ok::<_, futures::never::Never>(sum) +/// } +/// }); +/// futures::pin_mut!(unfold); +/// unfold.send(5).await?; +/// # Ok::<(), futures::never::Never>(()) }).unwrap(); +/// ``` +pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R> +where + F: FnMut(T, Item) -> R, + R: Future<Output = Result<T, E>>, +{ + Unfold { + function, + state: UnfoldState::Value { value: init }, + } +} + +impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R> +where + F: FnMut(T, Item) -> R, + R: Future<Output = Result<T, E>>, +{ + type Error = E; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.poll_flush(cx) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + let mut this = self.project(); + let future = match this.state.as_mut().take_value() { + Some(value) => (this.function)(value, item), + None => panic!("start_send called without poll_ready being called first"), + }; + this.state.set(UnfoldState::Future { future }); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + let mut this = self.project(); + Poll::Ready(if let Some(future) = this.state.as_mut().project_future() { + match ready!(future.poll(cx)) { + Ok(state) => { + this.state.set(UnfoldState::Value { value: state }); + Ok(()) + } + Err(err) => Err(err), + } + } else { + Ok(()) + }) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.poll_flush(cx) + } +} diff --git a/src/sink/with.rs b/src/sink/with.rs index 6329a0c..73b87b7 100644 --- a/src/sink/with.rs +++ b/src/sink/with.rs @@ -2,21 +2,23 @@ use core::fmt; use core::marker::PhantomData; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Sink for the [`with`](super::SinkExt::with) method. -#[pin_project] -#[must_use = "sinks do nothing unless polled"] -pub struct With<Si, Item, U, Fut, F> { - #[pin] - sink: Si, - f: F, - #[pin] - state: Option<Fut>, - _phantom: PhantomData<fn(U) -> Item>, +pin_project! { + /// Sink for the [`with`](super::SinkExt::with) method. + #[must_use = "sinks do nothing unless polled"] + pub struct With<Si, Item, U, Fut, F> { + #[pin] + sink: Si, + f: F, + #[pin] + state: Option<Fut>, + _phantom: PhantomData<fn(U) -> Item>, + } } impl<Si, Item, U, Fut, F> fmt::Debug for With<Si, Item, U, Fut, F> @@ -42,7 +44,7 @@ where Si: Sink<Item>, Fut: Future<Output = Result<Item, E>>, E: From<Si::Error>, { - With { + Self { state: None, sink, f, @@ -51,6 +53,22 @@ where Si: Sink<Item>, } } +impl<Si, Item, U, Fut, F> Clone for With<Si, Item, U, Fut, F> +where + Si: Clone, + F: Clone, + Fut: Clone, +{ + fn clone(&self) -> Self { + Self { + state: self.state.clone(), + sink: self.sink.clone(), + f: self.f.clone(), + _phantom: PhantomData, + } + } +} + // Forwarding impl of Stream from the underlying sink impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F> where S: Stream + Sink<Item>, diff --git a/src/sink/with_flat_map.rs b/src/sink/with_flat_map.rs index cf213e6..4b8d3a2 100644 --- a/src/sink/with_flat_map.rs +++ b/src/sink/with_flat_map.rs @@ -1,22 +1,24 @@ use core::fmt; use core::marker::PhantomData; use core::pin::Pin; +use futures_core::ready; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method. -#[pin_project] -#[must_use = "sinks do nothing unless polled"] -pub struct WithFlatMap<Si, Item, U, St, F> { - #[pin] - sink: Si, - f: F, - #[pin] - stream: Option<St>, - buffer: Option<Item>, - _marker: PhantomData<fn(U)>, +pin_project! { + /// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method. + #[must_use = "sinks do nothing unless polled"] + pub struct WithFlatMap<Si, Item, U, St, F> { + #[pin] + sink: Si, + f: F, + #[pin] + stream: Option<St>, + buffer: Option<Item>, + _marker: PhantomData<fn(U)>, + } } impl<Si, Item, U, St, F> fmt::Debug for WithFlatMap<Si, Item, U, St, F> @@ -41,7 +43,7 @@ where St: Stream<Item = Result<Item, Si::Error>>, { pub(super) fn new(sink: Si, f: F) -> Self { - WithFlatMap { + Self { sink, f, stream: None, diff --git a/src/stream/empty.rs b/src/stream/empty.rs index 903af68..d228b31 100644 --- a/src/stream/empty.rs +++ b/src/stream/empty.rs @@ -38,3 +38,9 @@ impl<T> Stream for Empty<T> { (0, Some(0)) } } + +impl<T> Clone for Empty<T> { + fn clone(&self) -> Self { + empty() + } +} diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs index 5dbd4ae..eda3b27 100644 --- a/src/stream/futures_ordered.rs +++ b/src/stream/futures_ordered.rs @@ -1,21 +1,26 @@ use crate::stream::{FuturesUnordered, StreamExt}; -use futures_core::future::Future; -use futures_core::stream::Stream; -use futures_core::{FusedStream, task::{Context, Poll}}; -use pin_project::pin_project; +use alloc::collections::binary_heap::{BinaryHeap, PeekMut}; use core::cmp::Ordering; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::pin::Pin; -use alloc::collections::binary_heap::{BinaryHeap, PeekMut}; - -#[pin_project] -#[must_use = "futures do nothing unless you `.await` or poll them"] -#[derive(Debug)] -struct OrderWrapper<T> { - #[pin] - data: T, // A future or a future's output - index: usize, +use futures_core::future::Future; +use futures_core::ready; +use futures_core::stream::Stream; +use futures_core::{ + task::{Context, Poll}, + FusedStream, +}; +use pin_project_lite::pin_project; + +pin_project! { + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Debug)] + struct OrderWrapper<T> { + #[pin] + data: T, // A future or a future's output + index: usize, + } } impl<T> PartialEq for OrderWrapper<T> { @@ -40,17 +45,17 @@ impl<T> Ord for OrderWrapper<T> { } impl<T> Future for OrderWrapper<T> - where T: Future +where + T: Future, { type Output = OrderWrapper<T::Output>; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Self::Output> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let index = self.index; - self.project().data.poll(cx) - .map(|output| OrderWrapper { data: output, index }) + self.project().data.poll(cx).map(|output| OrderWrapper { + data: output, + index, + }) } } @@ -104,8 +109,8 @@ impl<Fut: Future> FuturesOrdered<Fut> { /// /// The returned `FuturesOrdered` does not contain any futures and, in this /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`. - pub fn new() -> FuturesOrdered<Fut> { - FuturesOrdered { + pub fn new() -> Self { + Self { in_progress_queue: FuturesUnordered::new(), queued_outputs: BinaryHeap::new(), next_incoming_index: 0, @@ -144,18 +149,15 @@ impl<Fut: Future> FuturesOrdered<Fut> { } impl<Fut: Future> Default for FuturesOrdered<Fut> { - fn default() -> FuturesOrdered<Fut> { - FuturesOrdered::new() + fn default() -> Self { + Self::new() } } impl<Fut: Future> Stream for FuturesOrdered<Fut> { type Item = Fut::Output; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_> - ) -> Poll<Option<Self::Item>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let this = &mut *self; // Check to see if we've already received the next value @@ -198,8 +200,11 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> { where T: IntoIterator<Item = Fut>, { - let acc = FuturesOrdered::new(); - iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc }) + let acc = Self::new(); + iter.into_iter().fold(acc, |mut acc, item| { + acc.push(item); + acc + }) } } @@ -214,7 +219,7 @@ impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> { where I: IntoIterator<Item = Fut>, { - for item in iter.into_iter() { + for item in iter { self.push(item); } } diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index 2b7d704..37b7d7e 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs @@ -122,8 +122,8 @@ impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> { // run queue if it isn't inserted already. impl<Fut> Default for FuturesUnordered<Fut> { - fn default() -> FuturesUnordered<Fut> { - FuturesUnordered::new() + fn default() -> Self { + Self::new() } } @@ -133,7 +133,7 @@ impl<Fut> FuturesUnordered<Fut> { /// The returned [`FuturesUnordered`] does not contain any futures. /// In this state, [`FuturesUnordered::poll_next`](Stream::poll_next) will /// return [`Poll::Ready(None)`](Poll::Ready). - pub fn new() -> FuturesUnordered<Fut> { + pub fn new() -> Self { let stub = Arc::new(Task { future: UnsafeCell::new(None), next_all: AtomicPtr::new(ptr::null_mut()), @@ -151,7 +151,7 @@ impl<Fut> FuturesUnordered<Fut> { stub, }); - FuturesUnordered { + Self { head_all: AtomicPtr::new(ptr::null_mut()), ready_to_run_queue, is_terminated: AtomicBool::new(false), @@ -610,7 +610,7 @@ impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> { where I: IntoIterator<Item = Fut>, { - let acc = FuturesUnordered::new(); + let acc = Self::new(); iter.into_iter().fold(acc, |acc, item| { acc.push(item); acc }) } } @@ -626,7 +626,7 @@ impl<Fut> Extend<Fut> for FuturesUnordered<Fut> { where I: IntoIterator<Item = Fut>, { - for item in iter.into_iter() { + for item in iter { self.push(item); } } diff --git a/src/stream/futures_unordered/task.rs b/src/stream/futures_unordered/task.rs index abb0264..261408f 100644 --- a/src/stream/futures_unordered/task.rs +++ b/src/stream/futures_unordered/task.rs @@ -70,7 +70,7 @@ impl<Fut> ArcWake for Task<Fut> { impl<Fut> Task<Fut> { /// Returns a waker reference for this task without cloning the Arc. - pub(super) fn waker_ref(this: &Arc<Task<Fut>>) -> WakerRef<'_> { + pub(super) fn waker_ref(this: &Arc<Self>) -> WakerRef<'_> { waker_ref(this) } diff --git a/src/stream/iter.rs b/src/stream/iter.rs index e9df81c..cab8cd8 100644 --- a/src/stream/iter.rs +++ b/src/stream/iter.rs @@ -3,7 +3,7 @@ use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; /// Stream for the [`iter`] function. -#[derive(Debug)] +#[derive(Debug, Clone)] #[must_use = "streams do nothing unless polled"] pub struct Iter<I> { iter: I, diff --git a/src/stream/mod.rs b/src/stream/mod.rs index ca9bc89..a5624ba 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -1,8 +1,13 @@ -//! Streams +//! Asynchronous streams. //! -//! This module contains a number of functions for working with `Stream`s, -//! including the [`StreamExt`] trait and the [`TryStreamExt`] trait which add -//! methods to `Stream` types +//! This module contains: +//! +//! - The [`Stream`] trait, for objects that can asynchronously produce a +//! sequence of values. +//! - The [`StreamExt`] and [`TryStreamExt`] trait, which provides adapters for +//! chaining and composing streams. +//! - Top-level stream constructors like [`iter`](iter()) which creates a +//! stream from an iterator. #[cfg(feature = "alloc")] pub use futures_core::stream::{BoxStream, LocalBoxStream}; @@ -13,9 +18,9 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; #[allow(clippy::module_inception)] mod stream; pub use self::stream::{ - Chain, Collect, Concat, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, Fuse, - Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt, - StreamFuture, Take, TakeWhile, TakeUntil, Then, Zip, + Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, + Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt, + StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, }; #[cfg(feature = "std")] @@ -55,7 +60,7 @@ pub use self::try_stream::IntoAsyncRead; #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] -pub use self::try_stream::{TryBufferUnordered, TryForEachConcurrent}; +pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent}; // Primitive streams @@ -65,6 +70,9 @@ pub use self::iter::{iter, Iter}; mod repeat; pub use self::repeat::{repeat, Repeat}; +mod repeat_with; +pub use self::repeat_with::{repeat_with, RepeatWith}; + mod empty; pub use self::empty::{empty, Empty}; diff --git a/src/stream/once.rs b/src/stream/once.rs index 3a8fef6..318de07 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -1,8 +1,9 @@ use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; /// Creates a stream of a single element. /// @@ -19,13 +20,14 @@ pub fn once<Fut: Future>(future: Fut) -> Once<Fut> { Once::new(future) } -/// A stream which emits single element and then EOF. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Once<Fut> { - #[pin] - future: Option<Fut> +pin_project! { + /// A stream which emits single element and then EOF. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Once<Fut> { + #[pin] + future: Option<Fut> + } } impl<Fut> Once<Fut> { diff --git a/src/stream/pending.rs b/src/stream/pending.rs index bbe0750..ca793c1 100644 --- a/src/stream/pending.rs +++ b/src/stream/pending.rs @@ -36,3 +36,9 @@ impl<T> Stream for Pending<T> { (0, Some(0)) } } + +impl<T> Clone for Pending<T> { + fn clone(&self) -> Self { + pending() + } +} diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index 21749eb..6a2637d 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -3,7 +3,7 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; /// Stream for the [`repeat`] function. -#[derive(Debug)] +#[derive(Debug, Clone)] #[must_use = "streams do nothing unless polled"] pub struct Repeat<T> { item: T, diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs new file mode 100644 index 0000000..eb3313d --- /dev/null +++ b/src/stream/repeat_with.rs @@ -0,0 +1,93 @@ +use core::pin::Pin; +use futures_core::stream::{Stream, FusedStream}; +use futures_core::task::{Context, Poll}; + +/// An stream that repeats elements of type `A` endlessly by +/// applying the provided closure `F: FnMut() -> A`. +/// +/// This `struct` is created by the [`repeat_with()`] function. +/// See its documentation for more. +#[derive(Debug, Clone)] +#[must_use = "streams do nothing unless polled"] +pub struct RepeatWith<F> { + repeater: F, +} + +impl<A, F: FnMut() -> A> Unpin for RepeatWith<F> {} + +impl<A, F: FnMut() -> A> Stream for RepeatWith<F> { + type Item = A; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Poll::Ready(Some((&mut self.repeater)())) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (usize::max_value(), None) + } +} + +impl<A, F: FnMut() -> A> FusedStream for RepeatWith<F> +{ + fn is_terminated(&self) -> bool { + false + } +} + +/// Creates a new stream that repeats elements of type `A` endlessly by +/// applying the provided closure, the repeater, `F: FnMut() -> A`. +/// +/// The `repeat_with()` function calls the repeater over and over again. +/// +/// Infinite stream like `repeat_with()` are often used with adapters like +/// [`stream.take()`], in order to make them finite. +/// +/// If the element type of the stream you need implements [`Clone`], and +/// it is OK to keep the source element in memory, you should instead use +/// the [`stream.repeat()`] function. +/// +/// # Examples +/// +/// Basic usage: +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::stream::{self, StreamExt}; +/// +/// // let's assume we have some value of a type that is not `Clone` +/// // or which don't want to have in memory just yet because it is expensive: +/// #[derive(PartialEq, Debug)] +/// struct Expensive; +/// +/// // a particular value forever: +/// let mut things = stream::repeat_with(|| Expensive); +/// +/// assert_eq!(Some(Expensive), things.next().await); +/// assert_eq!(Some(Expensive), things.next().await); +/// assert_eq!(Some(Expensive), things.next().await); +/// # }); +/// ``` +/// +/// Using mutation and going finite: +/// +/// ```rust +/// # futures::executor::block_on(async { +/// use futures::stream::{self, StreamExt}; +/// +/// // From the zeroth to the third power of two: +/// let mut curr = 1; +/// let mut pow2 = stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp }) +/// .take(4); +/// +/// assert_eq!(Some(1), pow2.next().await); +/// assert_eq!(Some(2), pow2.next().await); +/// assert_eq!(Some(4), pow2.next().await); +/// assert_eq!(Some(8), pow2.next().await); +/// +/// // ... and now we're done +/// assert_eq!(None, pow2.next().await); +/// # }); +/// ``` +pub fn repeat_with<A, F: FnMut() -> A>(repeater: F) -> RepeatWith<F> { + RepeatWith { repeater } +} diff --git a/src/stream/select.rs b/src/stream/select.rs index 7666386..2b7ebec 100644 --- a/src/stream/select.rs +++ b/src/stream/select.rs @@ -2,18 +2,19 @@ use crate::stream::{StreamExt, Fuse}; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`select()`] function. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Select<St1, St2> { - #[pin] - stream1: Fuse<St1>, - #[pin] - stream2: Fuse<St2>, - flag: bool, +pin_project! { + /// Stream for the [`select()`] function. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Select<St1, St2> { + #[pin] + stream1: Fuse<St1>, + #[pin] + stream2: Fuse<St2>, + flag: bool, + } } /// This function will attempt to pull items from both streams. Each diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs index 2547993..00368bb 100644 --- a/src/stream/select_all.rs +++ b/src/stream/select_all.rs @@ -4,6 +4,7 @@ use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::pin::Pin; +use futures_core::ready; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; @@ -37,8 +38,8 @@ impl<St: Stream + Unpin> SelectAll<St> { /// /// The returned `SelectAll` does not contain any streams and, in this /// state, `SelectAll::poll` will return `Poll::Ready(None)`. - pub fn new() -> SelectAll<St> { - SelectAll { inner: FuturesUnordered::new() } + pub fn new() -> Self { + Self { inner: FuturesUnordered::new() } } /// Returns the number of streams contained in the set. @@ -65,8 +66,8 @@ impl<St: Stream + Unpin> SelectAll<St> { } impl<St: Stream + Unpin> Default for SelectAll<St> { - fn default() -> SelectAll<St> { - SelectAll::new() + fn default() -> Self { + Self::new() } } diff --git a/src/stream/stream/buffer_unordered.rs b/src/stream/stream/buffer_unordered.rs index 24f4853..de42cfd 100644 --- a/src/stream/stream/buffer_unordered.rs +++ b/src/stream/stream/buffer_unordered.rs @@ -4,22 +4,23 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; use core::fmt; use core::pin::Pin; -/// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) -/// method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct BufferUnordered<St> -where - St: Stream, -{ - #[pin] - stream: Fuse<St>, - in_progress_queue: FuturesUnordered<St::Item>, - max: usize, +pin_project! { + /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) + /// method. + #[must_use = "streams do nothing unless polled"] + pub struct BufferUnordered<St> + where + St: Stream, + { + #[pin] + stream: Fuse<St>, + in_progress_queue: FuturesUnordered<St::Item>, + max: usize, + } } impl<St> fmt::Debug for BufferUnordered<St> @@ -40,12 +41,12 @@ where St: Stream, St::Item: Future, { - pub(super) fn new(stream: St, n: usize) -> BufferUnordered<St> + pub(super) fn new(stream: St, n: usize) -> Self where St: Stream, St::Item: Future, { - BufferUnordered { + Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesUnordered::new(), max: n, diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs index 626ead1..1af9f49 100644 --- a/src/stream/stream/buffered.rs +++ b/src/stream/stream/buffered.rs @@ -1,25 +1,27 @@ use crate::stream::{Fuse, FuturesOrdered, StreamExt}; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; use core::fmt; use core::pin::Pin; -/// Stream for the [`buffered`](super::StreamExt::buffered) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct Buffered<St> -where - St: Stream, - St::Item: Future, -{ - #[pin] - stream: Fuse<St>, - in_progress_queue: FuturesOrdered<St::Item>, - max: usize, +pin_project! { + /// Stream for the [`buffered`](super::StreamExt::buffered) method. + #[must_use = "streams do nothing unless polled"] + pub struct Buffered<St> + where + St: Stream, + St::Item: Future, + { + #[pin] + stream: Fuse<St>, + in_progress_queue: FuturesOrdered<St::Item>, + max: usize, + } } impl<St> fmt::Debug for Buffered<St> @@ -41,8 +43,8 @@ where St: Stream, St::Item: Future, { - pub(super) fn new(stream: St, n: usize) -> Buffered<St> { - Buffered { + pub(super) fn new(stream: St, n: usize) -> Self { + Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesOrdered::new(), max: n, diff --git a/src/stream/stream/catch_unwind.rs b/src/stream/stream/catch_unwind.rs index b9eb4ba..d87a40a 100644 --- a/src/stream/stream/catch_unwind.rs +++ b/src/stream/stream/catch_unwind.rs @@ -1,23 +1,24 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::any::Any; use std::pin::Pin; use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; -/// Stream for the [`catch_unwind`](super::StreamExt::catch_unwind) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct CatchUnwind<St> { - #[pin] - stream: St, - caught_unwind: bool, +pin_project! { + /// Stream for the [`catch_unwind`](super::StreamExt::catch_unwind) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct CatchUnwind<St> { + #[pin] + stream: St, + caught_unwind: bool, + } } impl<St: Stream + UnwindSafe> CatchUnwind<St> { - pub(super) fn new(stream: St) -> CatchUnwind<St> { - CatchUnwind { stream, caught_unwind: false } + pub(super) fn new(stream: St) -> Self { + Self { stream, caught_unwind: false } } delegate_access_inner!(stream, St, ()); diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index c7fbd5f..2be7104 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -1,17 +1,19 @@ use core::pin::Pin; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`chain`](super::StreamExt::chain) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Chain<St1, St2> { - #[pin] - first: Option<St1>, - #[pin] - second: St2, +pin_project! { + /// Stream for the [`chain`](super::StreamExt::chain) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Chain<St1, St2> { + #[pin] + first: Option<St1>, + #[pin] + second: St2, + } } // All interactions with `Pin<&mut Chain<..>>` happen through these methods @@ -19,8 +21,8 @@ impl<St1, St2> Chain<St1, St2> where St1: Stream, St2: Stream<Item = St1::Item>, { - pub(super) fn new(stream1: St1, stream2: St2) -> Chain<St1, St2> { - Chain { + pub(super) fn new(stream1: St1, stream2: St2) -> Self { + Self { first: Some(stream1), second: stream2, } diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs index 9b4ed93..45a3212 100644 --- a/src/stream/stream/chunks.rs +++ b/src/stream/stream/chunks.rs @@ -1,29 +1,31 @@ use crate::stream::Fuse; +use futures_core::ready; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; use core::mem; use core::pin::Pin; use alloc::vec::Vec; -/// Stream for the [`chunks`](super::StreamExt::chunks) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Chunks<St: Stream> { - #[pin] - stream: Fuse<St>, - items: Vec<St::Item>, - cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 +pin_project! { + /// Stream for the [`chunks`](super::StreamExt::chunks) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Chunks<St: Stream> { + #[pin] + stream: Fuse<St>, + items: Vec<St::Item>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } } impl<St: Stream> Chunks<St> where St: Stream { - pub(super) fn new(stream: St, capacity: usize) -> Chunks<St> { + pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); - Chunks { + Self { stream: super::Fuse::new(stream), items: Vec::with_capacity(capacity), cap: capacity, diff --git a/src/stream/stream/collect.rs b/src/stream/stream/collect.rs index 6d07660..774b34b 100644 --- a/src/stream/stream/collect.rs +++ b/src/stream/stream/collect.rs @@ -1,18 +1,20 @@ use core::mem; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`collect`](super::StreamExt::collect) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Collect<St, C> { - #[pin] - stream: St, - collection: C, +pin_project! { + /// Future for the [`collect`](super::StreamExt::collect) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Collect<St, C> { + #[pin] + stream: St, + collection: C, + } } impl<St: Stream, C: Default> Collect<St, C> { @@ -20,8 +22,8 @@ impl<St: Stream, C: Default> Collect<St, C> { mem::replace(self.project().collection, Default::default()) } - pub(super) fn new(stream: St) -> Collect<St, C> { - Collect { + pub(super) fn new(stream: St) -> Self { + Self { stream, collection: Default::default(), } diff --git a/src/stream/stream/concat.rs b/src/stream/stream/concat.rs index 9b37cd2..ee1349f 100644 --- a/src/stream/stream/concat.rs +++ b/src/stream/stream/concat.rs @@ -1,17 +1,19 @@ use core::pin::Pin; use futures_core::future::{Future, FusedFuture}; +use futures_core::ready; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`concat`](super::StreamExt::concat) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Concat<St: Stream> { - #[pin] - stream: St, - accum: Option<St::Item>, +pin_project! { + /// Future for the [`concat`](super::StreamExt::concat) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Concat<St: Stream> { + #[pin] + stream: St, + accum: Option<St::Item>, + } } impl<St> Concat<St> @@ -19,8 +21,8 @@ where St: Stream, St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, { - pub(super) fn new(stream: St) -> Concat<St> { - Concat { + pub(super) fn new(stream: St) -> Self { + Self { stream, accum: None, } diff --git a/src/stream/stream/cycle.rs b/src/stream/stream/cycle.rs new file mode 100644 index 0000000..a5b7dc0 --- /dev/null +++ b/src/stream/stream/cycle.rs @@ -0,0 +1,71 @@ +use core::pin::Pin; +use core::usize; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`cycle`](super::StreamExt::cycle) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Cycle<St> { + orig: St, + #[pin] + stream: St, + } +} + +impl<St> Cycle<St> +where + St: Clone + Stream, +{ + pub(super) fn new(stream: St) -> Self { + Self { + orig: stream.clone(), + stream, + } + } +} + +impl<St> Stream for Cycle<St> +where + St: Clone + Stream, +{ + type Item = St::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + + match ready!(this.stream.as_mut().poll_next(cx)) { + None => { + this.stream.set(this.orig.clone()); + this.stream.poll_next(cx) + } + item => Poll::Ready(item), + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + // the cycle stream is either empty or infinite + match self.orig.size_hint() { + size @ (0, Some(0)) => size, + (0, _) => (0, None), + _ => (usize::max_value(), None), + } + } +} + +impl<St> FusedStream for Cycle<St> +where + St: Clone + Stream, +{ + fn is_terminated(&self) -> bool { + // the cycle stream is either empty or infinite + if let (0, Some(0)) = self.size_hint() { + true + } else { + false + } + } +} diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs index 4e6bac2..7d4c9cb 100644 --- a/src/stream/stream/enumerate.rs +++ b/src/stream/stream/enumerate.rs @@ -1,23 +1,25 @@ use core::pin::Pin; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`enumerate`](super::StreamExt::enumerate) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Enumerate<St> { - #[pin] - stream: St, - count: usize, +pin_project! { + /// Stream for the [`enumerate`](super::StreamExt::enumerate) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Enumerate<St> { + #[pin] + stream: St, + count: usize, + } } impl<St: Stream> Enumerate<St> { - pub(super) fn new(stream: St) -> Enumerate<St> { - Enumerate { + pub(super) fn new(stream: St) -> Self { + Self { stream, count: 0, } diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index 55493fe..57de025 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -1,25 +1,27 @@ use core::fmt; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; use crate::fns::FnMut1; -/// Stream for the [`filter`](super::StreamExt::filter) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct Filter<St, Fut, F> - where St: Stream, -{ - #[pin] - stream: St, - f: F, - #[pin] - pending_fut: Option<Fut>, - pending_item: Option<St::Item>, +pin_project! { + /// Stream for the [`filter`](super::StreamExt::filter) method. + #[must_use = "streams do nothing unless polled"] + pub struct Filter<St, Fut, F> + where St: Stream, + { + #[pin] + stream: St, + f: F, + #[pin] + pending_fut: Option<Fut>, + pending_item: Option<St::Item>, + } } impl<St, Fut, F> fmt::Debug for Filter<St, Fut, F> @@ -43,8 +45,8 @@ where St: Stream, F: for<'a> FnMut1<&'a St::Item, Output=Fut>, Fut: Future<Output = bool>, { - pub(super) fn new(stream: St, f: F) -> Filter<St, Fut, F> { - Filter { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, pending_fut: None, diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index 50c440f..b762fac 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -1,22 +1,24 @@ use core::fmt; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; use crate::fns::FnMut1; -/// Stream for the [`filter_map`](super::StreamExt::filter_map) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct FilterMap<St, Fut, F> { - #[pin] - stream: St, - f: F, - #[pin] - pending: Option<Fut>, +pin_project! { + /// Stream for the [`filter_map`](super::StreamExt::filter_map) method. + #[must_use = "streams do nothing unless polled"] + pub struct FilterMap<St, Fut, F> { + #[pin] + stream: St, + f: F, + #[pin] + pending: Option<Fut>, + } } impl<St, Fut, F> fmt::Debug for FilterMap<St, Fut, F> @@ -37,8 +39,8 @@ impl<St, Fut, F> FilterMap<St, Fut, F> F: FnMut(St::Item) -> Fut, Fut: Future, { - pub(super) fn new(stream: St, f: F) -> FilterMap<St, Fut, F> { - FilterMap { stream, f, pending: None } + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, pending: None } } delegate_access_inner!(stream, St, ()); diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 75bbc21..9f6b7a4 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,19 +1,21 @@ use core::pin::Pin; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`flatten`](super::StreamExt::flatten) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Flatten<St, U> { - #[pin] - stream: St, - #[pin] - next: Option<U>, +pin_project! { + /// Stream for the [`flatten`](super::StreamExt::flatten) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Flatten<St, U> { + #[pin] + stream: St, + #[pin] + next: Option<U>, + } } impl<St, U> Flatten<St, U> { diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index 6fce256..e109c3b 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -1,20 +1,22 @@ use core::fmt; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`fold`](super::StreamExt::fold) method. -#[pin_project] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Fold<St, Fut, T, F> { - #[pin] - stream: St, - f: F, - accum: Option<T>, - #[pin] - future: Option<Fut>, +pin_project! { + /// Future for the [`fold`](super::StreamExt::fold) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Fold<St, Fut, T, F> { + #[pin] + stream: St, + f: F, + accum: Option<T>, + #[pin] + future: Option<Fut>, + } } impl<St, Fut, T, F> fmt::Debug for Fold<St, Fut, T, F> @@ -37,8 +39,8 @@ where St: Stream, F: FnMut(T, St::Item) -> Fut, Fut: Future<Output = T>, { - pub(super) fn new(stream: St, f: F, t: T) -> Fold<St, Fut, T, F> { - Fold { + pub(super) fn new(stream: St, f: F, t: T) -> Self { + Self { stream, f, accum: Some(t), diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs index c8af21b..ee90e66 100644 --- a/src/stream/stream/for_each.rs +++ b/src/stream/stream/for_each.rs @@ -1,19 +1,21 @@ use core::fmt; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`for_each`](super::StreamExt::for_each) method. -#[pin_project] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ForEach<St, Fut, F> { - #[pin] - stream: St, - f: F, - #[pin] - future: Option<Fut>, +pin_project! { + /// Future for the [`for_each`](super::StreamExt::for_each) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct ForEach<St, Fut, F> { + #[pin] + stream: St, + f: F, + #[pin] + future: Option<Fut>, + } } impl<St, Fut, F> fmt::Debug for ForEach<St, Fut, F> @@ -34,8 +36,8 @@ where St: Stream, F: FnMut(St::Item) -> Fut, Fut: Future<Output = ()>, { - pub(super) fn new(stream: St, f: F) -> ForEach<St, Fut, F> { - ForEach { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, future: None, diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs index 843ddaa..cee0ba1 100644 --- a/src/stream/stream/for_each_concurrent.rs +++ b/src/stream/stream/for_each_concurrent.rs @@ -5,18 +5,19 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent) -/// method. -#[pin_project] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ForEachConcurrent<St, Fut, F> { - #[pin] - stream: Option<St>, - f: F, - futures: FuturesUnordered<Fut>, - limit: Option<NonZeroUsize>, +pin_project! { + /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent) + /// method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct ForEachConcurrent<St, Fut, F> { + #[pin] + stream: Option<St>, + f: F, + futures: FuturesUnordered<Fut>, + limit: Option<NonZeroUsize>, + } } impl<St, Fut, F> fmt::Debug for ForEachConcurrent<St, Fut, F> @@ -38,8 +39,8 @@ where St: Stream, F: FnMut(St::Item) -> Fut, Fut: Future<Output = ()>, { - pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> ForEachConcurrent<St, Fut, F> { - ForEachConcurrent { + pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self { + Self { stream: Some(stream), // Note: `limit` = 0 gets ignored. limit: limit.and_then(NonZeroUsize::new), diff --git a/src/stream/stream/forward.rs b/src/stream/stream/forward.rs index feef113..2247b21 100644 --- a/src/stream/stream/forward.rs +++ b/src/stream/stream/forward.rs @@ -1,26 +1,29 @@ use crate::stream::Fuse; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`forward`](super::StreamExt::forward) method. -#[pin_project(project = ForwardProj)] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Forward<St, Si, Item> { - #[pin] - sink: Option<Si>, - #[pin] - stream: Fuse<St>, - buffered_item: Option<Item>, +pin_project! { + /// Future for the [`forward`](super::StreamExt::forward) method. + #[project = ForwardProj] + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Forward<St, Si, Item> { + #[pin] + sink: Option<Si>, + #[pin] + stream: Fuse<St>, + buffered_item: Option<Item>, + } } impl<St, Si, Item> Forward<St, Si, Item> { pub(crate) fn new(stream: St, sink: Si) -> Self { - Forward { + Self { sink: Some(sink), stream: Fuse::new(stream), buffered_item: None, diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 408ad81..e1d8c12 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -1,23 +1,25 @@ use core::pin::Pin; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`fuse`](super::StreamExt::fuse) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Fuse<St> { - #[pin] - stream: St, - done: bool, +pin_project! { + /// Stream for the [`fuse`](super::StreamExt::fuse) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Fuse<St> { + #[pin] + stream: St, + done: bool, + } } impl<St> Fuse<St> { - pub(super) fn new(stream: St) -> Fuse<St> { - Fuse { stream, done: false } + pub(super) fn new(stream: St) -> Self { + Self { stream, done: false } } /// Returns whether the underlying stream has finished or not. diff --git a/src/stream/stream/into_future.rs b/src/stream/stream/into_future.rs index 8aa2b1e..a9a1e23 100644 --- a/src/stream/stream/into_future.rs +++ b/src/stream/stream/into_future.rs @@ -1,6 +1,7 @@ use crate::stream::StreamExt; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; @@ -12,8 +13,8 @@ pub struct StreamFuture<St> { } impl<St: Stream + Unpin> StreamFuture<St> { - pub(super) fn new(stream: St) -> StreamFuture<St> { - StreamFuture { stream: Some(stream) } + pub(super) fn new(stream: St) -> Self { + Self { stream: Some(stream) } } /// Acquires a reference to the underlying stream that this combinator is diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs index c877512..1a269f0 100644 --- a/src/stream/stream/map.rs +++ b/src/stream/stream/map.rs @@ -1,20 +1,22 @@ use core::fmt; use core::pin::Pin; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; use crate::fns::FnMut1; -/// Stream for the [`map`](super::StreamExt::map) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct Map<St, F> { - #[pin] - stream: St, - f: F, +pin_project! { + /// Stream for the [`map`](super::StreamExt::map) method. + #[must_use = "streams do nothing unless polled"] + pub struct Map<St, F> { + #[pin] + stream: St, + f: F, + } } impl<St, F> fmt::Debug for Map<St, F> @@ -29,8 +31,8 @@ where } impl<St, F> Map<St, F> { - pub(crate) fn new(stream: St, f: F) -> Map<St, F> { - Map { stream, f } + pub(crate) fn new(stream: St, f: F) -> Self { + Self { stream, f } } delegate_access_inner!(stream, St, ()); diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index f988468..b1b4384 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -29,10 +29,18 @@ mod collect; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::collect::Collect; +mod unzip; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::unzip::Unzip; + mod concat; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::concat::Concat; +mod cycle; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::cycle::Cycle; + mod enumerate; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::enumerate::Enumerate; @@ -473,6 +481,45 @@ pub trait StreamExt: Stream { assert_future::<C, _>(Collect::new(self)) } + /// Converts a stream of pairs into a future, which + /// resolves to pair of containers. + /// + /// `unzip()` produces a future, which resolves to two + /// collections: one from the left elements of the pairs, + /// and one from the right elements. + /// + /// The returned future will be resolved when the stream terminates. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::StreamExt; + /// use std::thread; + /// + /// let (tx, rx) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx.unbounded_send((1, 2)).unwrap(); + /// tx.unbounded_send((3, 4)).unwrap(); + /// tx.unbounded_send((5, 6)).unwrap(); + /// }); + /// + /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await; + /// assert_eq!(o1, vec![1, 3, 5]); + /// assert_eq!(o2, vec![2, 4, 6]); + /// # }); + /// ``` + fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> + where + FromA: Default + Extend<A>, + FromB: Default + Extend<B>, + Self: Sized + Stream<Item = (A, B)>, + { + assert_future::<(FromA, FromB), _>(Unzip::new(self)) + } + /// Concatenate all items of a stream into a single extendable /// destination, returning a future representing the end result. /// @@ -513,6 +560,36 @@ pub trait StreamExt: Stream { assert_future::<Self::Item, _>(Concat::new(self)) } + /// Repeats a stream endlessly. + /// + /// The stream never terminates. Note that you likely want to avoid + /// usage of `collect` or such on the returned stream as it will exhaust + /// available memory as it tries to just fill up all RAM. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// let a = [1, 2, 3]; + /// let mut s = stream::iter(a.iter()).cycle(); + /// + /// assert_eq!(s.next().await, Some(&1)); + /// assert_eq!(s.next().await, Some(&2)); + /// assert_eq!(s.next().await, Some(&3)); + /// assert_eq!(s.next().await, Some(&1)); + /// assert_eq!(s.next().await, Some(&2)); + /// assert_eq!(s.next().await, Some(&3)); + /// assert_eq!(s.next().await, Some(&1)); + /// # }); + /// ``` + fn cycle(self) -> Cycle<Self> + where + Self: Sized + Clone, + { + assert_stream::<Self::Item, _>(Cycle::new(self)) + } + /// Execute an accumulating asynchronous computation over a stream, /// collecting all the values into one final result. /// diff --git a/src/stream/stream/next.rs b/src/stream/stream/next.rs index 2d74632..6949878 100644 --- a/src/stream/stream/next.rs +++ b/src/stream/stream/next.rs @@ -15,7 +15,7 @@ impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {} impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { - Next { stream } + Self { stream } } } diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs index 1d8c342..a403110 100644 --- a/src/stream/stream/peek.rs +++ b/src/stream/stream/peek.rs @@ -2,29 +2,31 @@ use crate::stream::{Fuse, StreamExt}; use core::fmt; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; - -/// A `Stream` that implements a `peek` method. -/// -/// The `peek` method can be used to retrieve a reference -/// to the next `Stream::Item` if available. A subsequent -/// call to `poll` will return the owned item. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Peekable<St: Stream> { - #[pin] - stream: Fuse<St>, - peeked: Option<St::Item>, +use pin_project_lite::pin_project; + +pin_project! { + /// A `Stream` that implements a `peek` method. + /// + /// The `peek` method can be used to retrieve a reference + /// to the next `Stream::Item` if available. A subsequent + /// call to `poll` will return the owned item. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Peekable<St: Stream> { + #[pin] + stream: Fuse<St>, + peeked: Option<St::Item>, + } } impl<St: Stream> Peekable<St> { - pub(super) fn new(stream: St) -> Peekable<St> { - Peekable { + pub(super) fn new(stream: St) -> Self { + Self { stream: stream.fuse(), peeked: None, } @@ -100,11 +102,12 @@ where delegate_sink!(stream, Item); } -/// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`] -#[pin_project] -#[must_use = "futures do nothing unless polled"] -pub struct Peek<'a, St: Stream> { - inner: Option<Pin<&'a mut Peekable<St>>>, +pin_project! { + /// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`] + #[must_use = "futures do nothing unless polled"] + pub struct Peek<'a, St: Stream> { + inner: Option<Pin<&'a mut Peekable<St>>>, + } } impl<St> fmt::Debug for Peek<'_, St> diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs index 192d3c6..b6e3e5c 100644 --- a/src/stream/stream/ready_chunks.rs +++ b/src/stream/stream/ready_chunks.rs @@ -3,27 +3,28 @@ use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; use core::mem; use core::pin::Pin; use alloc::vec::Vec; -/// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct ReadyChunks<St: Stream> { - #[pin] - stream: Fuse<St>, - items: Vec<St::Item>, - cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 +pin_project! { + /// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct ReadyChunks<St: Stream> { + #[pin] + stream: Fuse<St>, + items: Vec<St::Item>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } } impl<St: Stream> ReadyChunks<St> where St: Stream { - pub(super) fn new(stream: St, capacity: usize) -> ReadyChunks<St> { + pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); - ReadyChunks { + Self { stream: super::Fuse::new(stream), items: Vec::with_capacity(capacity), cap: capacity, diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index dd0316d..2097280 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -1,26 +1,28 @@ use core::fmt; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; struct StateFn<S, F> { state: S, f: F, } -/// Stream for the [`scan`](super::StreamExt::scan) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct Scan<St: Stream, S, Fut, F> { - #[pin] - stream: St, - state_f: Option<StateFn<S, F>>, - #[pin] - future: Option<Fut>, +pin_project! { + /// Stream for the [`scan`](super::StreamExt::scan) method. + #[must_use = "streams do nothing unless polled"] + pub struct Scan<St: Stream, S, Fut, F> { + #[pin] + stream: St, + state_f: Option<StateFn<S, F>>, + #[pin] + future: Option<Fut>, + } } impl<St, S, Fut, F> fmt::Debug for Scan<St, S, Fut, F> @@ -53,8 +55,8 @@ where F: FnMut(&mut S, St::Item) -> Fut, Fut: Future<Output = Option<B>>, { - pub(super) fn new(stream: St, initial_state: S, f: F) -> Scan<St, S, Fut, F> { - Scan { + pub(super) fn new(stream: St, initial_state: S, f: F) -> Self { + Self { stream, state_f: Some(StateFn { state: initial_state, diff --git a/src/stream/stream/select_next_some.rs b/src/stream/stream/select_next_some.rs index 884f252..fe7a089 100644 --- a/src/stream/stream/select_next_some.rs +++ b/src/stream/stream/select_next_some.rs @@ -1,4 +1,5 @@ use core::pin::Pin; +use futures_core::ready; use futures_core::stream::FusedStream; use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; @@ -14,7 +15,7 @@ pub struct SelectNextSome<'a, St: ?Sized> { impl<'a, St: ?Sized> SelectNextSome<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { - SelectNextSome { stream } + Self { stream } } } diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs index ea31b17..6ffcf57 100644 --- a/src/stream/stream/skip.rs +++ b/src/stream/stream/skip.rs @@ -1,23 +1,25 @@ use core::pin::Pin; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`skip`](super::StreamExt::skip) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Skip<St> { - #[pin] - stream: St, - remaining: usize, +pin_project! { + /// Stream for the [`skip`](super::StreamExt::skip) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Skip<St> { + #[pin] + stream: St, + remaining: usize, + } } impl<St: Stream> Skip<St> { - pub(super) fn new(stream: St, n: usize) -> Skip<St> { - Skip { + pub(super) fn new(stream: St, n: usize) -> Self { + Self { stream, remaining: n, } diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs index 2c58102..e1aa3f9 100644 --- a/src/stream/stream/skip_while.rs +++ b/src/stream/stream/skip_while.rs @@ -1,23 +1,25 @@ use core::fmt; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`skip_while`](super::StreamExt::skip_while) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct SkipWhile<St, Fut, F> where St: Stream { - #[pin] - stream: St, - f: F, - #[pin] - pending_fut: Option<Fut>, - pending_item: Option<St::Item>, - done_skipping: bool, +pin_project! { + /// Stream for the [`skip_while`](super::StreamExt::skip_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct SkipWhile<St, Fut, F> where St: Stream { + #[pin] + stream: St, + f: F, + #[pin] + pending_fut: Option<Fut>, + pending_item: Option<St::Item>, + done_skipping: bool, + } } impl<St, Fut, F> fmt::Debug for SkipWhile<St, Fut, F> @@ -41,8 +43,8 @@ impl<St, Fut, F> SkipWhile<St, Fut, F> F: FnMut(&St::Item) -> Fut, Fut: Future<Output = bool>, { - pub(super) fn new(stream: St, f: F) -> SkipWhile<St, Fut, F> { - SkipWhile { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, pending_fut: None, diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs index 991eb16..997b974 100644 --- a/src/stream/stream/split.rs +++ b/src/stream/stream/split.rs @@ -1,3 +1,4 @@ +use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index 1956a82..124d397 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -1,24 +1,26 @@ use core::cmp; use core::pin::Pin; +use futures_core::ready; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`take`](super::StreamExt::take) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Take<St> { - #[pin] - stream: St, - remaining: usize, +pin_project! { + /// Stream for the [`take`](super::StreamExt::take) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Take<St> { + #[pin] + stream: St, + remaining: usize, + } } impl<St: Stream> Take<St> { - pub(super) fn new(stream: St, n: usize) -> Take<St> { - Take { + pub(super) fn new(stream: St, n: usize) -> Self { + Self { stream, remaining: n, } diff --git a/src/stream/stream/take_until.rs b/src/stream/stream/take_until.rs index 4e32ad8..4dea01a 100644 --- a/src/stream/stream/take_until.rs +++ b/src/stream/stream/take_until.rs @@ -1,28 +1,30 @@ use core::fmt; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; // FIXME: docs, tests -/// Stream for the [`take_until`](super::StreamExt::take_until) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct TakeUntil<St: Stream, Fut: Future> { - #[pin] - stream: St, - /// Contains the inner Future on start and None once the inner Future is resolved - /// or taken out by the user. - #[pin] - fut: Option<Fut>, - /// Contains fut's return value once fut is resolved - fut_result: Option<Fut::Output>, - /// Whether the future was taken out by the user. - free: bool, +pin_project! { + /// Stream for the [`take_until`](super::StreamExt::take_until) method. + #[must_use = "streams do nothing unless polled"] + pub struct TakeUntil<St: Stream, Fut: Future> { + #[pin] + stream: St, + // Contains the inner Future on start and None once the inner Future is resolved + // or taken out by the user. + #[pin] + fut: Option<Fut>, + // Contains fut's return value once fut is resolved + fut_result: Option<Fut::Output>, + // Whether the future was taken out by the user. + free: bool, + } } impl<St, Fut> fmt::Debug for TakeUntil<St, Fut> @@ -44,8 +46,8 @@ where St: Stream, Fut: Future, { - pub(super) fn new(stream: St, fut: Fut) -> TakeUntil<St, Fut> { - TakeUntil { + pub(super) fn new(stream: St, fut: Fut) -> Self { + Self { stream, fut: Some(fut), fut_result: None, @@ -73,7 +75,7 @@ where /// This may be used to retrieve arbitrary data from the stopping /// future, for example a reason why the stream was stopped. /// - /// This method will return `None` if the future isn't resovled yet, + /// This method will return `None` if the future isn't resolved yet, /// or if the result was already taken out. /// /// # Examples diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index e3a6d00..4cdba83 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -1,23 +1,25 @@ use core::fmt; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`take_while`](super::StreamExt::take_while) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct TakeWhile<St: Stream, Fut, F> { - #[pin] - stream: St, - f: F, - #[pin] - pending_fut: Option<Fut>, - pending_item: Option<St::Item>, - done_taking: bool, +pin_project! { + /// Stream for the [`take_while`](super::StreamExt::take_while) method. + #[must_use = "streams do nothing unless polled"] + pub struct TakeWhile<St: Stream, Fut, F> { + #[pin] + stream: St, + f: F, + #[pin] + pending_fut: Option<Fut>, + pending_item: Option<St::Item>, + done_taking: bool, + } } impl<St, Fut, F> fmt::Debug for TakeWhile<St, Fut, F> @@ -41,8 +43,8 @@ impl<St, Fut, F> TakeWhile<St, Fut, F> F: FnMut(&St::Item) -> Fut, Fut: Future<Output = bool>, { - pub(super) fn new(stream: St, f: F) -> TakeWhile<St, Fut, F> { - TakeWhile { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, pending_fut: None, diff --git a/src/stream/stream/then.rs b/src/stream/stream/then.rs index 1334a6c..3d42bdd 100644 --- a/src/stream/stream/then.rs +++ b/src/stream/stream/then.rs @@ -1,21 +1,23 @@ use core::fmt; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`then`](super::StreamExt::then) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct Then<St, Fut, F> { - #[pin] - stream: St, - #[pin] - future: Option<Fut>, - f: F, +pin_project! { + /// Stream for the [`then`](super::StreamExt::then) method. + #[must_use = "streams do nothing unless polled"] + pub struct Then<St, Fut, F> { + #[pin] + stream: St, + #[pin] + future: Option<Fut>, + f: F, + } } impl<St, Fut, F> fmt::Debug for Then<St, Fut, F> @@ -35,8 +37,8 @@ impl<St, Fut, F> Then<St, Fut, F> where St: Stream, F: FnMut(St::Item) -> Fut, { - pub(super) fn new(stream: St, f: F) -> Then<St, Fut, F> { - Then { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, future: None, f, diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs new file mode 100644 index 0000000..5024770 --- /dev/null +++ b/src/stream/stream/unzip.rs @@ -0,0 +1,68 @@ +use core::mem; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use pin_project_lite::pin_project; + +pin_project! { + /// Future for the [`unzip`](super::StreamExt::unzip) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Unzip<St, FromA, FromB> { + #[pin] + stream: St, + left: FromA, + right: FromB, + } +} + +impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> { + fn finish(self: Pin<&mut Self>) -> (FromA, FromB) { + let this = self.project(); + ( + mem::replace(this.left, Default::default()), + mem::replace(this.right, Default::default()), + ) + } + + pub(super) fn new(stream: St) -> Self { + Self { + stream, + left: Default::default(), + right: Default::default(), + } + } +} + +impl<St, A, B, FromA, FromB> FusedFuture for Unzip<St, FromA, FromB> +where St: FusedStream<Item = (A, B)>, + FromA: Default + Extend<A>, + FromB: Default + Extend<B>, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +impl<St, A, B, FromA, FromB> Future for Unzip<St, FromA, FromB> +where St: Stream<Item = (A, B)>, + FromA: Default + Extend<A>, + FromB: Default + Extend<B>, +{ + type Output = (FromA, FromB); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(FromA, FromB)> { + let mut this = self.as_mut().project(); + loop { + match ready!(this.stream.as_mut().poll_next(cx)) { + Some(e) => { + this.left.extend(Some(e.0)); + this.right.extend(Some(e.1)); + }, + None => return Poll::Ready(self.finish()), + } + } + } +} diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index 522f7e1..588531a 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -3,24 +3,25 @@ use core::cmp; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; - -/// Stream for the [`zip`](super::StreamExt::zip) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Zip<St1: Stream, St2: Stream> { - #[pin] - stream1: Fuse<St1>, - #[pin] - stream2: Fuse<St2>, - queued1: Option<St1::Item>, - queued2: Option<St2::Item>, +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`zip`](super::StreamExt::zip) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Zip<St1: Stream, St2: Stream> { + #[pin] + stream1: Fuse<St1>, + #[pin] + stream2: Fuse<St2>, + queued1: Option<St1::Item>, + queued2: Option<St2::Item>, + } } impl<St1: Stream, St2: Stream> Zip<St1, St2> { - pub(super) fn new(stream1: St1, stream2: St2) -> Zip<St1, St2> { - Zip { + pub(super) fn new(stream1: St1, stream2: St2) -> Self { + Self { stream1: stream1.fuse(), stream2: stream2.fuse(), queued1: None, diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs index 4d24c4f..b185646 100644 --- a/src/stream/try_stream/and_then.rs +++ b/src/stream/try_stream/and_then.rs @@ -1,21 +1,23 @@ use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; +use futures_core::ready; use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`and_then`](super::TryStreamExt::and_then) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct AndThen<St, Fut, F> { - #[pin] - stream: St, - #[pin] - future: Option<Fut>, - f: F, +pin_project! { + /// Stream for the [`and_then`](super::TryStreamExt::and_then) method. + #[must_use = "streams do nothing unless polled"] + pub struct AndThen<St, Fut, F> { + #[pin] + stream: St, + #[pin] + future: Option<Fut>, + f: F, + } } impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F> diff --git a/src/stream/try_stream/into_async_read.rs b/src/stream/try_stream/into_async_read.rs index 10291b0..197c105 100644 --- a/src/stream/try_stream/into_async_read.rs +++ b/src/stream/try_stream/into_async_read.rs @@ -1,5 +1,6 @@ use crate::stream::TryStreamExt; use core::pin::Pin; +use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; use futures_io::{AsyncRead, AsyncWrite, AsyncBufRead}; @@ -39,7 +40,7 @@ where St::Ok: AsRef<[u8]>, { pub(super) fn new(stream: St) -> Self { - IntoAsyncRead { + Self { stream, state: ReadState::PendingChunk, } diff --git a/src/stream/try_stream/into_stream.rs b/src/stream/try_stream/into_stream.rs index 370a327..89bc3ef 100644 --- a/src/stream/try_stream/into_stream.rs +++ b/src/stream/try_stream/into_stream.rs @@ -3,21 +3,22 @@ use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; - -/// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct IntoStream<St> { - #[pin] - stream: St, +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct IntoStream<St> { + #[pin] + stream: St, + } } impl<St> IntoStream<St> { #[inline] pub(super) fn new(stream: St) -> Self { - IntoStream { stream } + Self { stream } } delegate_access_inner!(stream, St, ()); diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs index 0c8e108..6a48a4c 100644 --- a/src/stream/try_stream/mod.rs +++ b/src/stream/try_stream/mod.rs @@ -115,6 +115,12 @@ cfg_target_has_atomic! { pub use self::try_buffer_unordered::TryBufferUnordered; #[cfg(feature = "alloc")] + mod try_buffered; + #[cfg(feature = "alloc")] + #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 + pub use self::try_buffered::TryBuffered; + + #[cfg(feature = "alloc")] mod try_for_each_concurrent; #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 @@ -773,7 +779,7 @@ pub trait TryStreamExt: TryStream { assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self)) } - /// Attempt to execute several futures from a stream concurrently. + /// Attempt to execute several futures from a stream concurrently (unordered). /// /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type /// that matches the stream's `Error` type. @@ -842,6 +848,80 @@ pub trait TryStreamExt: TryStream { ) } + /// Attempt to execute several futures from a stream concurrently. + /// + /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type + /// that matches the stream's `Error` type. + /// + /// This adaptor will buffer up to `n` futures and then return their + /// outputs in the order. If the underlying stream returns an error, it will + /// be immediately propagated. + /// + /// The returned stream will be a stream of results, each containing either + /// an error or a future's output. An error can be produced either by the + /// underlying stream itself or by one of the futures it yielded. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// Results are returned in the order of addition: + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::oneshot; + /// use futures::future::lazy; + /// use futures::stream::{self, StreamExt, TryStreamExt}; + /// + /// let (send_one, recv_one) = oneshot::channel(); + /// let (send_two, recv_two) = oneshot::channel(); + /// + /// let mut buffered = lazy(move |cx| { + /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]); + /// + /// let mut buffered = stream_of_futures.try_buffered(10); + /// + /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); + /// + /// send_two.send(2i32)?; + /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); + /// Ok::<_, i32>(buffered) + /// }).await?; + /// + /// send_one.send(1i32)?; + /// assert_eq!(buffered.next().await, Some(Ok(1i32))); + /// assert_eq!(buffered.next().await, Some(Ok(2i32))); + /// + /// assert_eq!(buffered.next().await, None); + /// # Ok::<(), i32>(()) }).unwrap(); + /// ``` + /// + /// Errors from the underlying stream itself are propagated: + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::{StreamExt, TryStreamExt}; + /// + /// let (sink, stream_of_futures) = mpsc::unbounded(); + /// let mut buffered = stream_of_futures.try_buffered(10); + /// + /// sink.unbounded_send(Ok(async { Ok(7i32) }))?; + /// assert_eq!(buffered.next().await, Some(Ok(7i32))); + /// + /// sink.unbounded_send(Err("error in the stream"))?; + /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(feature = "alloc")] + fn try_buffered(self, n: usize) -> TryBuffered<Self> + where + Self::Ok: TryFuture<Error = Self::Error>, + Self: Sized, + { + TryBuffered::new(self, n) + } + // TODO: false positive warning from rustdoc. Verify once #43466 settles // /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`] diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs index d972c0f..999123a 100644 --- a/src/stream/try_stream/or_else.rs +++ b/src/stream/try_stream/or_else.rs @@ -1,21 +1,23 @@ use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; +use futures_core::ready; use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`or_else`](super::TryStreamExt::or_else) method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct OrElse<St, Fut, F> { - #[pin] - stream: St, - #[pin] - future: Option<Fut>, - f: F, +pin_project! { + /// Stream for the [`or_else`](super::TryStreamExt::or_else) method. + #[must_use = "streams do nothing unless polled"] + pub struct OrElse<St, Fut, F> { + #[pin] + stream: St, + #[pin] + future: Option<Fut>, + f: F, + } } impl<St, Fut, F> fmt::Debug for OrElse<St, Fut, F> diff --git a/src/stream/try_stream/try_buffer_unordered.rs b/src/stream/try_stream/try_buffer_unordered.rs index c1c931a..71c6fc7 100644 --- a/src/stream/try_stream/try_buffer_unordered.rs +++ b/src/stream/try_stream/try_buffer_unordered.rs @@ -5,21 +5,22 @@ use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; use core::pin::Pin; -/// Stream for the -/// [`try_buffer_unordered`](super::TryStreamExt::try_buffer_unordered) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct TryBufferUnordered<St> - where St: TryStream -{ - #[pin] - stream: Fuse<IntoStream<St>>, - in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>, - max: usize, +pin_project! { + /// Stream for the + /// [`try_buffer_unordered`](super::TryStreamExt::try_buffer_unordered) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct TryBufferUnordered<St> + where St: TryStream + { + #[pin] + stream: Fuse<IntoStream<St>>, + in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>, + max: usize, + } } impl<St> TryBufferUnordered<St> @@ -27,7 +28,7 @@ impl<St> TryBufferUnordered<St> St::Ok: TryFuture, { pub(super) fn new(stream: St, n: usize) -> Self { - TryBufferUnordered { + Self { stream: IntoStream::new(stream).fuse(), in_progress_queue: FuturesUnordered::new(), max: n, diff --git a/src/stream/try_stream/try_buffered.rs b/src/stream/try_stream/try_buffered.rs new file mode 100644 index 0000000..ff7e844 --- /dev/null +++ b/src/stream/try_stream/try_buffered.rs @@ -0,0 +1,90 @@ +use crate::stream::{Fuse, FuturesOrdered, StreamExt, IntoStream}; +use crate::future::{IntoFuture, TryFutureExt}; +use futures_core::future::TryFuture; +use futures_core::stream::{Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; +use core::pin::Pin; + +pin_project! { + /// Stream for the [`try_buffered`](super::TryStreamExt::try_buffered) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct TryBuffered<St> + where + St: TryStream, + St::Ok: TryFuture, + { + #[pin] + stream: Fuse<IntoStream<St>>, + in_progress_queue: FuturesOrdered<IntoFuture<St::Ok>>, + max: usize, + } +} + +impl<St> TryBuffered<St> +where + St: TryStream, + St::Ok: TryFuture, +{ + pub(super) fn new(stream: St, n: usize) -> Self { + Self { + stream: IntoStream::new(stream).fuse(), + in_progress_queue: FuturesOrdered::new(), + max: n, + } + } + + delegate_access_inner!(stream, St, (. .)); +} + +impl<St> Stream for TryBuffered<St> +where + St: TryStream, + St::Ok: TryFuture<Error = St::Error>, +{ + type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + + // First up, try to spawn off as many futures as possible by filling up + // our queue of futures. Propagate errors from the stream immediately. + while this.in_progress_queue.len() < *this.max { + match this.stream.as_mut().poll_next(cx)? { + Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()), + Poll::Ready(None) | Poll::Pending => break, + } + } + + // Attempt to pull the next value from the in_progress_queue + match this.in_progress_queue.poll_next_unpin(cx) { + x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, + Poll::Ready(None) => {} + } + + // If more values are still coming from the stream, we're not done yet + if this.stream.is_done() { + Poll::Ready(None) + } else { + Poll::Pending + } + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<S, Item, E> Sink<Item> for TryBuffered<S> +where + S: TryStream + Sink<Item, Error = E>, + S::Ok: TryFuture<Error = E>, +{ + type Error = E; + + delegate_sink!(stream, Item); +} diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs index 76ce619..387de97 100644 --- a/src/stream/try_stream/try_collect.rs +++ b/src/stream/try_stream/try_collect.rs @@ -1,23 +1,25 @@ use core::mem; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; +use futures_core::ready; use futures_core::stream::{FusedStream, TryStream}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`try_collect`](super::TryStreamExt::try_collect) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct TryCollect<St, C> { - #[pin] - stream: St, - items: C, +pin_project! { + /// Future for the [`try_collect`](super::TryStreamExt::try_collect) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryCollect<St, C> { + #[pin] + stream: St, + items: C, + } } impl<St: TryStream, C: Default> TryCollect<St, C> { - pub(super) fn new(s: St) -> TryCollect<St, C> { - TryCollect { + pub(super) fn new(s: St) -> Self { + Self { stream: s, items: Default::default(), } diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs index 235a2c5..2451332 100644 --- a/src/stream/try_stream/try_concat.rs +++ b/src/stream/try_stream/try_concat.rs @@ -1,17 +1,19 @@ use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`try_concat`](super::TryStreamExt::try_concat) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct TryConcat<St: TryStream> { - #[pin] - stream: St, - accum: Option<St::Ok>, +pin_project! { + /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryConcat<St: TryStream> { + #[pin] + stream: St, + accum: Option<St::Ok>, + } } impl<St> TryConcat<St> @@ -19,8 +21,8 @@ where St: TryStream, St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default, { - pub(super) fn new(stream: St) -> TryConcat<St> { - TryConcat { + pub(super) fn new(stream: St) -> Self { + Self { stream, accum: None, } diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs index 38d060c..eacefd2 100644 --- a/src/stream/try_stream/try_filter.rs +++ b/src/stream/try_stream/try_filter.rs @@ -1,25 +1,27 @@ use core::fmt; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`try_filter`](super::TryStreamExt::try_filter) -/// method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct TryFilter<St, Fut, F> - where St: TryStream -{ - #[pin] - stream: St, - f: F, - #[pin] - pending_fut: Option<Fut>, - pending_item: Option<St::Ok>, +pin_project! { + /// Stream for the [`try_filter`](super::TryStreamExt::try_filter) + /// method. + #[must_use = "streams do nothing unless polled"] + pub struct TryFilter<St, Fut, F> + where St: TryStream + { + #[pin] + stream: St, + f: F, + #[pin] + pending_fut: Option<Fut>, + pending_item: Option<St::Ok>, + } } impl<St, Fut, F> fmt::Debug for TryFilter<St, Fut, F> @@ -41,7 +43,7 @@ impl<St, Fut, F> TryFilter<St, Fut, F> where St: TryStream { pub(super) fn new(stream: St, f: F) -> Self { - TryFilter { + Self { stream, f, pending_fut: None, diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs index 6c1e48f..335649b 100644 --- a/src/stream/try_stream/try_filter_map.rs +++ b/src/stream/try_stream/try_filter_map.rs @@ -1,22 +1,24 @@ use core::fmt; use core::pin::Pin; use futures_core::future::{TryFuture}; +use futures_core::ready; use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map) -/// method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct TryFilterMap<St, Fut, F> { - #[pin] - stream: St, - f: F, - #[pin] - pending: Option<Fut>, +pin_project! { + /// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map) + /// method. + #[must_use = "streams do nothing unless polled"] + pub struct TryFilterMap<St, Fut, F> { + #[pin] + stream: St, + f: F, + #[pin] + pending: Option<Fut>, + } } impl<St, Fut, F> fmt::Debug for TryFilterMap<St, Fut, F> @@ -34,7 +36,7 @@ where impl<St, Fut, F> TryFilterMap<St, Fut, F> { pub(super) fn new(stream: St, f: F) -> Self { - TryFilterMap { stream, f, pending: None } + Self { stream, f, pending: None } } delegate_access_inner!(stream, St, ()); @@ -66,8 +68,9 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F> Poll::Ready(loop { if let Some(p) = this.pending.as_mut().as_pin_mut() { // We have an item in progress, poll that until it's done - let item = ready!(p.try_poll(cx)?); + let res = ready!(p.try_poll(cx)); this.pending.set(None); + let item = res?; if item.is_some() { break item.map(Ok); } diff --git a/src/stream/try_stream/try_flatten.rs b/src/stream/try_stream/try_flatten.rs index 5227903..4fc04a0 100644 --- a/src/stream/try_stream/try_flatten.rs +++ b/src/stream/try_stream/try_flatten.rs @@ -1,22 +1,24 @@ use core::pin::Pin; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method. -#[pin_project] -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct TryFlatten<St> -where - St: TryStream, -{ - #[pin] - stream: St, - #[pin] - next: Option<St::Ok>, +pin_project! { + /// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct TryFlatten<St> + where + St: TryStream, + { + #[pin] + stream: St, + #[pin] + next: Option<St::Ok>, + } } impl<St> TryFlatten<St> diff --git a/src/stream/try_stream/try_fold.rs b/src/stream/try_stream/try_fold.rs index abeeea4..1d41e4b 100644 --- a/src/stream/try_stream/try_fold.rs +++ b/src/stream/try_stream/try_fold.rs @@ -1,20 +1,22 @@ use core::fmt; use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`try_fold`](super::TryStreamExt::try_fold) method. -#[pin_project] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct TryFold<St, Fut, T, F> { - #[pin] - stream: St, - f: F, - accum: Option<T>, - #[pin] - future: Option<Fut>, +pin_project! { + /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryFold<St, Fut, T, F> { + #[pin] + stream: St, + f: F, + accum: Option<T>, + #[pin] + future: Option<Fut>, + } } impl<St, Fut, T, F> fmt::Debug for TryFold<St, Fut, T, F> @@ -37,8 +39,8 @@ where St: TryStream, F: FnMut(T, St::Ok) -> Fut, Fut: TryFuture<Ok = T, Error = St::Error>, { - pub(super) fn new(stream: St, f: F, t: T) -> TryFold<St, Fut, T, F> { - TryFold { + pub(super) fn new(stream: St, f: F, t: T) -> Self { + Self { stream, f, accum: Some(t), diff --git a/src/stream/try_stream/try_for_each.rs b/src/stream/try_stream/try_for_each.rs index a00e911..0a814ae 100644 --- a/src/stream/try_stream/try_for_each.rs +++ b/src/stream/try_stream/try_for_each.rs @@ -1,19 +1,21 @@ use core::fmt; use core::pin::Pin; use futures_core::future::{Future, TryFuture}; +use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method. -#[pin_project] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct TryForEach<St, Fut, F> { - #[pin] - stream: St, - f: F, - #[pin] - future: Option<Fut>, +pin_project! { + /// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryForEach<St, Fut, F> { + #[pin] + stream: St, + f: F, + #[pin] + future: Option<Fut>, + } } impl<St, Fut, F> fmt::Debug for TryForEach<St, Fut, F> @@ -34,8 +36,8 @@ where St: TryStream, F: FnMut(St::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = St::Error>, { - pub(super) fn new(stream: St, f: F) -> TryForEach<St, Fut, F> { - TryForEach { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, future: None, diff --git a/src/stream/try_stream/try_for_each_concurrent.rs b/src/stream/try_stream/try_for_each_concurrent.rs index db8e505..d2f4b0f 100644 --- a/src/stream/try_stream/try_for_each_concurrent.rs +++ b/src/stream/try_stream/try_for_each_concurrent.rs @@ -6,19 +6,20 @@ use core::num::NonZeroUsize; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Future for the -/// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent) -/// method. -#[pin_project] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct TryForEachConcurrent<St, Fut, F> { - #[pin] - stream: Option<St>, - f: F, - futures: FuturesUnordered<Fut>, - limit: Option<NonZeroUsize>, +pin_project! { + /// Future for the + /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent) + /// method. + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct TryForEachConcurrent<St, Fut, F> { + #[pin] + stream: Option<St>, + f: F, + futures: FuturesUnordered<Fut>, + limit: Option<NonZeroUsize>, + } } impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F> @@ -50,8 +51,8 @@ where St: TryStream, F: FnMut(St::Ok) -> Fut, Fut: Future<Output = Result<(), St::Error>>, { - pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> TryForEachConcurrent<St, Fut, F> { - TryForEachConcurrent { + pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self { + Self { stream: Some(stream), // Note: `limit` = 0 gets ignored. limit: limit.and_then(NonZeroUsize::new), diff --git a/src/stream/try_stream/try_next.rs b/src/stream/try_stream/try_next.rs index 78599ad..1bc00fb 100644 --- a/src/stream/try_stream/try_next.rs +++ b/src/stream/try_stream/try_next.rs @@ -15,7 +15,7 @@ impl<St: ?Sized + Unpin> Unpin for TryNext<'_, St> {} impl<'a, St: ?Sized + TryStream + Unpin> TryNext<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { - TryNext { stream } + Self { stream } } } diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs index 35759d0..0603b10 100644 --- a/src/stream/try_stream/try_skip_while.rs +++ b/src/stream/try_stream/try_skip_while.rs @@ -1,24 +1,26 @@ use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; +use futures_core::ready; use futures_core::stream::{Stream, TryStream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while) -/// method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct TrySkipWhile<St, Fut, F> where St: TryStream { - #[pin] - stream: St, - f: F, - #[pin] - pending_fut: Option<Fut>, - pending_item: Option<St::Ok>, - done_skipping: bool, +pin_project! { + /// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while) + /// method. + #[must_use = "streams do nothing unless polled"] + pub struct TrySkipWhile<St, Fut, F> where St: TryStream { + #[pin] + stream: St, + f: F, + #[pin] + pending_fut: Option<Fut>, + pending_item: Option<St::Ok>, + done_skipping: bool, + } } impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F> @@ -42,8 +44,8 @@ impl<St, Fut, F> TrySkipWhile<St, Fut, F> F: FnMut(&St::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = St::Error>, { - pub(super) fn new(stream: St, f: F) -> TrySkipWhile<St, Fut, F> { - TrySkipWhile { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, pending_fut: None, @@ -74,9 +76,10 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F> Poll::Ready(loop { if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { - let skipped = ready!(fut.try_poll(cx)?); - let item = this.pending_item.take(); + let res = ready!(fut.try_poll(cx)); this.pending_fut.set(None); + let skipped = res?; + let item = this.pending_item.take(); if !skipped { *this.done_skipping = true; break item.map(Ok); diff --git a/src/stream/try_stream/try_take_while.rs b/src/stream/try_stream/try_take_while.rs index 16bfb20..6241572 100644 --- a/src/stream/try_stream/try_take_while.rs +++ b/src/stream/try_stream/try_take_while.rs @@ -1,27 +1,29 @@ use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; -use pin_project::pin_project; +use pin_project_lite::pin_project; -/// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while) -/// method. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct TryTakeWhile<St, Fut, F> -where - St: TryStream, -{ - #[pin] - stream: St, - f: F, - #[pin] - pending_fut: Option<Fut>, - pending_item: Option<St::Ok>, - done_taking: bool, +pin_project! { + /// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while) + /// method. + #[must_use = "streams do nothing unless polled"] + pub struct TryTakeWhile<St, Fut, F> + where + St: TryStream, + { + #[pin] + stream: St, + f: F, + #[pin] + pending_fut: Option<Fut>, + pending_item: Option<St::Ok>, + done_taking: bool, + } } impl<St, Fut, F> fmt::Debug for TryTakeWhile<St, Fut, F> @@ -46,8 +48,8 @@ where F: FnMut(&St::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = St::Error>, { - pub(super) fn new(stream: St, f: F) -> TryTakeWhile<St, Fut, F> { - TryTakeWhile { + pub(super) fn new(stream: St, f: F) -> Self { + Self { stream, f, pending_fut: None, @@ -76,9 +78,10 @@ where Poll::Ready(loop { if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { - let take = ready!(fut.try_poll(cx)?); - let item = this.pending_item.take(); + let res = ready!(fut.try_poll(cx)); this.pending_fut.set(None); + let take = res?; + let item = this.pending_item.take(); if take { break item.map(Ok); } else { diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs index a6b31fe..c8fc421 100644 --- a/src/stream/try_stream/try_unfold.rs +++ b/src/stream/try_stream/try_unfold.rs @@ -1,9 +1,10 @@ use core::fmt; use core::pin::Pin; use futures_core::future::TryFuture; +use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; /// Creates a `TryStream` from a seed and a closure returning a `TryFuture`. /// @@ -66,14 +67,15 @@ where } } -/// Stream for the [`try_unfold`] function. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct TryUnfold<T, F, Fut> { - f: F, - state: Option<T>, - #[pin] - fut: Option<Fut>, +pin_project! { + /// Stream for the [`try_unfold`] function. + #[must_use = "streams do nothing unless polled"] + pub struct TryUnfold<T, F, Fut> { + f: F, + state: Option<T>, + #[pin] + fut: Option<Fut>, + } } impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut> diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs index b6f8eae..473bb67 100644 --- a/src/stream/unfold.rs +++ b/src/stream/unfold.rs @@ -1,9 +1,11 @@ +use crate::unfold_state::UnfoldState; use core::fmt; use core::pin::Pin; use futures_core::future::Future; +use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project::pin_project; +use pin_project_lite::pin_project; /// Creates a `Stream` from a seed and a closure returning a `Future`. /// @@ -45,24 +47,24 @@ use pin_project::pin_project; /// # }); /// ``` pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut> - where F: FnMut(T) -> Fut, - Fut: Future<Output = Option<(Item, T)>>, +where + F: FnMut(T) -> Fut, + Fut: Future<Output = Option<(Item, T)>>, { Unfold { f, - state: Some(init), - fut: None, + state: UnfoldState::Value { value: init }, } } -/// Stream for the [`unfold`] function. -#[pin_project] -#[must_use = "streams do nothing unless polled"] -pub struct Unfold<T, F, Fut> { - f: F, - state: Option<T>, - #[pin] - fut: Option<Fut>, +pin_project! { + /// Stream for the [`unfold`] function. + #[must_use = "streams do nothing unless polled"] + pub struct Unfold<T, F, Fut> { + f: F, + #[pin] + state: UnfoldState<T, Fut>, + } } impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut> @@ -73,44 +75,50 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Unfold") .field("state", &self.state) - .field("fut", &self.fut) .finish() } } impl<T, F, Fut, Item> FusedStream for Unfold<T, F, Fut> - where F: FnMut(T) -> Fut, - Fut: Future<Output = Option<(Item, T)>>, +where + F: FnMut(T) -> Fut, + Fut: Future<Output = Option<(Item, T)>>, { fn is_terminated(&self) -> bool { - self.state.is_none() && self.fut.is_none() + if let UnfoldState::Empty = self.state { + true + } else { + false + } } } impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut> - where F: FnMut(T) -> Fut, - Fut: Future<Output = Option<(Item, T)>>, +where + F: FnMut(T) -> Fut, + Fut: Future<Output = Option<(Item, T)>>, { type Item = Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); - if let Some(state) = this.state.take() { - this.fut.set(Some((this.f)(state))); + if let Some(state) = this.state.as_mut().take_value() { + this.state.set(UnfoldState::Future { + future: (this.f)(state), + }); } - let step = ready!(this.fut.as_mut().as_pin_mut() - .expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx)); - this.fut.set(None); + let step = match this.state.as_mut().project_future() { + Some(fut) => ready!(fut.poll(cx)), + None => panic!("Unfold must not be polled after it returned `Poll::Ready(None)`"), + }; if let Some((item, next_state)) = step { - *this.state = Some(next_state); + this.state.set(UnfoldState::Value { value: next_state }); Poll::Ready(Some(item)) } else { + this.state.set(UnfoldState::Empty); Poll::Ready(None) } } diff --git a/src/task/mod.rs b/src/task/mod.rs index fb3b7ad..77e5a96 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -1,4 +1,25 @@ -//! Task notification +//! Tools for working with tasks. +//! +//! This module contains: +//! +//! - [`Spawn`], a trait for spawning new tasks. +//! - [`Context`], a context of an asynchronous task, +//! including a handle for waking up the task. +//! - [`Waker`], a handle for waking up a task. +//! +//! The remaining types and traits in the module are used for implementing +//! executors or dealing with synchronization issues around task wakeup. + +pub use futures_core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable}; + +pub use futures_task::{ + Spawn, LocalSpawn, SpawnError, + FutureObj, LocalFutureObj, UnsafeFutureObj, +}; + +pub use futures_task::noop_waker; +#[cfg(feature = "std")] +pub use futures_task::noop_waker_ref; cfg_target_has_atomic! { #[cfg(feature = "alloc")] @@ -15,14 +36,3 @@ cfg_target_has_atomic! { mod spawn; pub use self::spawn::{SpawnExt, LocalSpawnExt}; - -pub use futures_core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable}; - -pub use futures_task::{ - Spawn, LocalSpawn, SpawnError, - FutureObj, LocalFutureObj, UnsafeFutureObj, -}; - -pub use futures_task::noop_waker; -#[cfg(feature = "std")] -pub use futures_task::noop_waker_ref; diff --git a/src/unfold_state.rs b/src/unfold_state.rs new file mode 100644 index 0000000..0edc15e --- /dev/null +++ b/src/unfold_state.rs @@ -0,0 +1,39 @@ +use core::pin::Pin; + +use pin_project_lite::pin_project; + +pin_project! { + /// UnfoldState used for stream and sink unfolds + #[project = UnfoldStateProj] + #[project_replace = UnfoldStateProjReplace] + #[derive(Debug)] + pub(crate) enum UnfoldState<T, R> { + Value { + value: T, + }, + Future { + #[pin] + future: R, + }, + Empty, + } +} + +impl<T, R> UnfoldState<T, R> { + pub(crate) fn project_future(self: Pin<&mut Self>) -> Option<Pin<&mut R>> { + match self.project() { + UnfoldStateProj::Future { future } => Some(future), + _ => None, + } + } + + pub(crate) fn take_value(self: Pin<&mut Self>) -> Option<T> { + match &*self { + UnfoldState::Value { .. } => match self.project_replace(UnfoldState::Empty) { + UnfoldStateProjReplace::Value { value } => Some(value), + _ => unreachable!(), + }, + _ => None, + } + } +} |