aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTreehugger Robot <treehugger-gerrit@google.com>2021-02-10 12:32:08 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-02-10 12:32:08 +0000
commitace3f190a64e6095d8888f52cf5e188db07b59af (patch)
tree6768a19a65780281fc232fc768b947d4be2b124b
parentb79273b58553c67dbfc3be350ce662ec1763d8bf (diff)
parent102eb371e74174a76a956011444b11fd7eebe656 (diff)
downloadfutures-util-ace3f190a64e6095d8888f52cf5e188db07b59af.tar.gz
Merge "Upgrade rust/crates/futures-util to 0.3.12" am: 102eb371e7
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/1581956 MUST ONLY BE SUBMITTED BY AUTOMERGER Change-Id: I1094a39a3e2a6f3229cbacef8c0510c400b1050d
-rw-r--r--.cargo_vcs_info.json5
-rw-r--r--Android.bp71
-rw-r--r--Cargo.toml24
-rw-r--r--Cargo.toml.orig25
-rw-r--r--METADATA10
-rw-r--r--TEST_MAPPING2
-rw-r--r--benches_disabled/bilock.rs4
-rw-r--r--src/async_await/join_mod.rs4
-rw-r--r--src/async_await/select_mod.rs4
-rw-r--r--src/compat/compat01as03.rs12
-rw-r--r--src/compat/compat03as01.rs10
-rw-r--r--src/compat/mod.rs2
-rw-r--r--src/fns.rs4
-rw-r--r--src/future/abortable.rs25
-rw-r--r--src/future/either.rs77
-rw-r--r--src/future/future/catch_unwind.rs22
-rw-r--r--src/future/future/flatten.rs57
-rw-r--r--src/future/future/fuse.rs31
-rw-r--r--src/future/future/map.rs56
-rw-r--r--src/future/future/mod.rs2
-rw-r--r--src/future/future/remote_handle.rs34
-rw-r--r--src/future/future/shared.rs70
-rw-r--r--src/future/join.rs17
-rw-r--r--src/future/maybe_done.rs46
-rw-r--r--src/future/mod.rs15
-rw-r--r--src/future/option.rs54
-rw-r--r--src/future/try_future/into_future.rs24
-rw-r--r--src/future/try_future/try_flatten.rs63
-rw-r--r--src/future/try_future/try_flatten_err.rs31
-rw-r--r--src/future/try_join.rs19
-rw-r--r--src/future/try_maybe_done.rs56
-rw-r--r--src/io/allow_std.rs2
-rw-r--r--src/io/buf_reader.rs56
-rw-r--r--src/io/buf_writer.rs58
-rw-r--r--src/io/chain.rs22
-rw-r--r--src/io/close.rs2
-rw-r--r--src/io/copy.rs17
-rw-r--r--src/io/copy_buf.rs22
-rw-r--r--src/io/cursor.rs4
-rw-r--r--src/io/fill_buf.rs2
-rw-r--r--src/io/flush.rs2
-rw-r--r--src/io/into_sink.rs28
-rw-r--r--src/io/lines.rs25
-rw-r--r--src/io/mod.rs21
-rw-r--r--src/io/read.rs2
-rw-r--r--src/io/read_exact.rs3
-rw-r--r--src/io/read_line.rs3
-rw-r--r--src/io/read_to_end.rs40
-rw-r--r--src/io/read_to_string.rs3
-rw-r--r--src/io/read_until.rs1
-rw-r--r--src/io/repeat.rs1
-rw-r--r--src/io/split.rs1
-rw-r--r--src/io/take.rs22
-rw-r--r--src/io/write_all.rs3
-rw-r--r--src/io/write_all_vectored.rs5
-rw-r--r--src/lib.rs116
-rw-r--r--src/lock/bilock.rs4
-rw-r--r--src/lock/mod.rs30
-rw-r--r--src/lock/mutex.rs18
-rw-r--r--src/never.rs5
-rw-r--r--src/sink/buffer.rs28
-rw-r--r--src/sink/close.rs2
-rw-r--r--src/sink/err_into.rs21
-rw-r--r--src/sink/fanout.rs31
-rw-r--r--src/sink/feed.rs49
-rw-r--r--src/sink/flush.rs2
-rw-r--r--src/sink/map_err.rs23
-rw-r--r--src/sink/mod.rs117
-rw-r--r--src/sink/send.rs25
-rw-r--r--src/sink/send_all.rs5
-rw-r--r--src/sink/unfold.rs88
-rw-r--r--src/sink/with.rs42
-rw-r--r--src/sink/with_flat_map.rs28
-rw-r--r--src/stream/empty.rs6
-rw-r--r--src/stream/futures_ordered.rs67
-rw-r--r--src/stream/futures_unordered/mod.rs12
-rw-r--r--src/stream/futures_unordered/task.rs2
-rw-r--r--src/stream/iter.rs2
-rw-r--r--src/stream/mod.rs24
-rw-r--r--src/stream/once.rs18
-rw-r--r--src/stream/pending.rs6
-rw-r--r--src/stream/repeat.rs2
-rw-r--r--src/stream/repeat_with.rs93
-rw-r--r--src/stream/select.rs23
-rw-r--r--src/stream/select_all.rs9
-rw-r--r--src/stream/stream/buffer_unordered.rs31
-rw-r--r--src/stream/stream/buffered.rs32
-rw-r--r--src/stream/stream/catch_unwind.rs23
-rw-r--r--src/stream/stream/chain.rs26
-rw-r--r--src/stream/stream/chunks.rs26
-rw-r--r--src/stream/stream/collect.rs24
-rw-r--r--src/stream/stream/concat.rs24
-rw-r--r--src/stream/stream/cycle.rs71
-rw-r--r--src/stream/stream/enumerate.rs24
-rw-r--r--src/stream/stream/filter.rs32
-rw-r--r--src/stream/stream/filter_map.rs26
-rw-r--r--src/stream/stream/flatten.rs22
-rw-r--r--src/stream/stream/fold.rs28
-rw-r--r--src/stream/stream/for_each.rs26
-rw-r--r--src/stream/stream/for_each_concurrent.rs27
-rw-r--r--src/stream/stream/forward.rs27
-rw-r--r--src/stream/stream/fuse.rs24
-rw-r--r--src/stream/stream/into_future.rs5
-rw-r--r--src/stream/stream/map.rs22
-rw-r--r--src/stream/stream/mod.rs77
-rw-r--r--src/stream/stream/next.rs2
-rw-r--r--src/stream/stream/peek.rs45
-rw-r--r--src/stream/stream/ready_chunks.rs25
-rw-r--r--src/stream/stream/scan.rs26
-rw-r--r--src/stream/stream/select_next_some.rs3
-rw-r--r--src/stream/stream/skip.rs24
-rw-r--r--src/stream/stream/skip_while.rs30
-rw-r--r--src/stream/stream/split.rs1
-rw-r--r--src/stream/stream/take.rs24
-rw-r--r--src/stream/stream/take_until.rs38
-rw-r--r--src/stream/stream/take_while.rs30
-rw-r--r--src/stream/stream/then.rs26
-rw-r--r--src/stream/stream/unzip.rs68
-rw-r--r--src/stream/stream/zip.rs31
-rw-r--r--src/stream/try_stream/and_then.rs22
-rw-r--r--src/stream/try_stream/into_async_read.rs3
-rw-r--r--src/stream/try_stream/into_stream.rs21
-rw-r--r--src/stream/try_stream/mod.rs82
-rw-r--r--src/stream/try_stream/or_else.rs22
-rw-r--r--src/stream/try_stream/try_buffer_unordered.rs29
-rw-r--r--src/stream/try_stream/try_buffered.rs90
-rw-r--r--src/stream/try_stream/try_collect.rs24
-rw-r--r--src/stream/try_stream/try_concat.rs24
-rw-r--r--src/stream/try_stream/try_filter.rs32
-rw-r--r--src/stream/try_stream/try_filter_map.rs29
-rw-r--r--src/stream/try_stream/try_flatten.rs28
-rw-r--r--src/stream/try_stream/try_fold.rs28
-rw-r--r--src/stream/try_stream/try_for_each.rs26
-rw-r--r--src/stream/try_stream/try_for_each_concurrent.rs29
-rw-r--r--src/stream/try_stream/try_next.rs2
-rw-r--r--src/stream/try_stream/try_skip_while.rs37
-rw-r--r--src/stream/try_stream/try_take_while.rs43
-rw-r--r--src/stream/try_stream/try_unfold.rs20
-rw-r--r--src/stream/unfold.rs66
-rw-r--r--src/task/mod.rs34
-rw-r--r--src/unfold_state.rs39
141 files changed, 2434 insertions, 1385 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644
index 0000000..4fd4ba3
--- /dev/null
+++ b/.cargo_vcs_info.json
@@ -0,0 +1,5 @@
+{
+ "git": {
+ "sha1": "1d53a29ec16ccd5b094fb205edb73591455eb4b6"
+ }
+}
diff --git a/Android.bp b/Android.bp
index 4d810a9..89834ae 100644
--- a/Android.bp
+++ b/Android.bp
@@ -32,10 +32,11 @@ rust_defaults {
"libfutures_sink",
"libfutures_task",
"libmemchr",
- "libpin_project",
+ "libpin_project_lite",
"libpin_utils",
"libproc_macro_nested",
"libslab",
+ "libtokio",
],
proc_macros: [
"libfutures_macro",
@@ -46,6 +47,9 @@ rust_defaults {
rust_test_host {
name: "futures-util_host_test_src_lib",
defaults: ["futures-util_defaults"],
+ test_options: {
+ unit_test: true,
+ },
}
rust_test {
@@ -84,7 +88,7 @@ rust_library {
"libfutures_sink",
"libfutures_task",
"libmemchr",
- "libpin_project",
+ "libpin_project_lite",
"libpin_utils",
"libproc_macro_nested",
"libslab",
@@ -100,21 +104,62 @@ rust_library {
}
// dependent_library ["feature_list"]
-// futures-channel-0.3.8 "alloc,std"
-// futures-core-0.3.8 "alloc,std"
-// futures-io-0.3.8 "std"
-// futures-macro-0.3.7
-// futures-sink-0.3.8
-// futures-task-0.3.8 "alloc,once_cell,std"
+// autocfg-1.0.1
+// byteorder-1.4.2 "default,std"
+// bytes-0.4.12
+// cfg-if-0.1.10
+// cfg-if-1.0.0
+// crossbeam-deque-0.7.3
+// crossbeam-epoch-0.8.2 "default,lazy_static,std"
+// crossbeam-queue-0.2.3 "default,std"
+// crossbeam-utils-0.7.2 "default,lazy_static,std"
+// fnv-1.0.7 "default,std"
+// futures-0.1.30 "default,use_std,with-deprecated"
+// futures-channel-0.3.12 "alloc,std"
+// futures-core-0.3.12 "alloc,std"
+// futures-io-0.3.12 "std"
+// futures-macro-0.3.12
+// futures-sink-0.3.12
+// futures-task-0.3.12 "alloc,once_cell,std"
+// iovec-0.1.4
+// lazy_static-1.4.0
+// libc-0.2.86 "default,std"
+// lock_api-0.3.4
+// log-0.4.14
+// maybe-uninit-2.0.0
// memchr-2.3.4 "default,std"
+// memoffset-0.5.6 "default"
+// mio-0.6.23 "default,with-deprecated"
+// mio-uds-0.6.8
+// net2-0.2.37 "default,duration"
+// num_cpus-1.13.0
// once_cell-1.5.2 "alloc,std"
-// pin-project-1.0.2
-// pin-project-internal-1.0.2
+// parking_lot-0.9.0 "default"
+// parking_lot_core-0.6.2
+// pin-project-lite-0.2.4
// pin-utils-0.1.0
// proc-macro-hack-0.5.19
-// proc-macro-nested-0.1.6
+// proc-macro-nested-0.1.7
// proc-macro2-1.0.24 "default,proc-macro"
-// quote-1.0.7 "default,proc-macro"
+// quote-1.0.8 "default,proc-macro"
+// rustc_version-0.2.3
+// scopeguard-1.1.0
+// semver-0.9.0 "default"
+// semver-parser-0.7.0
// slab-0.4.2
-// syn-1.0.53 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut"
+// smallvec-0.6.14 "default,std"
+// syn-1.0.60 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote"
+// tokio-0.1.22 "bytes,codec,default,fs,io,mio,num_cpus,reactor,rt-full,sync,tcp,timer,tokio-codec,tokio-current-thread,tokio-executor,tokio-fs,tokio-io,tokio-reactor,tokio-sync,tokio-tcp,tokio-threadpool,tokio-timer,tokio-udp,tokio-uds,udp,uds"
+// tokio-codec-0.1.2
+// tokio-current-thread-0.1.7
+// tokio-executor-0.1.10
+// tokio-fs-0.1.7
+// tokio-io-0.1.13
+// tokio-reactor-0.1.12
+// tokio-sync-0.1.8
+// tokio-tcp-0.1.4
+// tokio-threadpool-0.1.18
+// tokio-timer-0.2.13
+// tokio-udp-0.1.6
+// tokio-uds-0.2.7
// unicode-xid-0.2.1 "default"
diff --git a/Cargo.toml b/Cargo.toml
index 356f96f..1b96b8f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,44 +13,44 @@
[package]
edition = "2018"
name = "futures-util"
-version = "0.3.7"
+version = "0.3.12"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
description = "Common utilities and extension traits for the futures-rs library.\n"
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-util/0.3.7"
+documentation = "https://docs.rs/futures-util/0.3"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[dependencies.futures-channel]
-version = "0.3.7"
+version = "0.3.12"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-core]
-version = "0.3.7"
+version = "0.3.12"
default-features = false
[dependencies.futures-io]
-version = "0.3.7"
+version = "0.3.12"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-macro]
-version = "=0.3.7"
+version = "=0.3.12"
optional = true
default-features = false
[dependencies.futures-sink]
-version = "0.3.7"
+version = "0.3.12"
optional = true
default-features = false
[dependencies.futures-task]
-version = "0.3.7"
+version = "0.3.12"
default-features = false
[dependencies.futures_01]
@@ -62,14 +62,14 @@ package = "futures"
version = "2.2"
optional = true
-[dependencies.pin-project]
-version = "1.0.1"
+[dependencies.pin-project-lite]
+version = "0.2.4"
[dependencies.pin-utils]
version = "0.1.0"
[dependencies.proc-macro-hack]
-version = "0.5.9"
+version = "0.5.19"
optional = true
[dependencies.proc-macro-nested]
@@ -83,6 +83,8 @@ optional = true
[dependencies.tokio-io]
version = "0.1.9"
optional = true
+[dev-dependencies.tokio]
+version = "0.1.11"
[features]
alloc = ["futures-core/alloc", "futures-task/alloc"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index c768e59..010902b 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,12 +1,12 @@
[package]
name = "futures-util"
edition = "2018"
-version = "0.3.7"
+version = "0.3.12"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-util/0.3.7"
+documentation = "https://docs.rs/futures-util/0.3"
description = """
Common utilities and extension traits for the futures-rs library.
"""
@@ -33,20 +33,25 @@ read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"]
write-all-vectored = ["io"]
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.7", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.7", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.7", default-features = false, features = ["std"], optional = true }
-futures-io = { path = "../futures-io", version = "0.3.7", default-features = false, features = ["std"], optional = true }
-futures-sink = { path = "../futures-sink", version = "0.3.7", default-features = false, optional = true }
-futures-macro = { path = "../futures-macro", version = "=0.3.7", default-features = false, optional = true }
-proc-macro-hack = { version = "0.5.9", optional = true }
+futures-core = { path = "../futures-core", version = "0.3.12", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.12", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.12", default-features = false, features = ["std"], optional = true }
+futures-io = { path = "../futures-io", version = "0.3.12", default-features = false, features = ["std"], optional = true }
+futures-sink = { path = "../futures-sink", version = "0.3.12", default-features = false, optional = true }
+futures-macro = { path = "../futures-macro", version = "=0.3.12", default-features = false, optional = true }
+proc-macro-hack = { version = "0.5.19", optional = true }
proc-macro-nested = { version = "0.1.2", optional = true }
slab = { version = "0.4.2", optional = true }
memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-utils = "0.1.0"
-pin-project = "1.0.1"
+pin-project-lite = "0.2.4"
+
+[dev-dependencies]
+futures = { path = "../futures", features = ["async-await", "thread-pool"] }
+futures-test = { path = "../futures-test" }
+tokio = "0.1.11"
[package.metadata.docs.rs]
all-features = true
diff --git a/METADATA b/METADATA
index 179eb51..f29b4dd 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures-util/futures-util-0.3.7.crate"
+ value: "https://static.crates.io/crates/futures-util/futures-util-0.3.12.crate"
}
- version: "0.3.7"
+ version: "0.3.12"
license_type: NOTICE
last_upgrade_date {
- year: 2020
- month: 10
- day: 25
+ year: 2021
+ month: 2
+ day: 9
}
}
diff --git a/TEST_MAPPING b/TEST_MAPPING
index b518f2a..7e10dd0 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -1,4 +1,4 @@
-// Generated by cargo2android.py for tests in Android.bp
+// Generated by update_crate_tests.py for tests that depend on this crate.
{
"presubmit": [
{
diff --git a/benches_disabled/bilock.rs b/benches_disabled/bilock.rs
index 78b5edb..48afe3c 100644
--- a/benches_disabled/bilock.rs
+++ b/benches_disabled/bilock.rs
@@ -29,8 +29,8 @@ struct LockStream {
}
impl LockStream {
- fn new(lock: BiLock<u32>) -> LockStream {
- LockStream {
+ fn new(lock: BiLock<u32>) -> Self {
+ Self {
lock: lock.lock()
}
}
diff --git a/src/async_await/join_mod.rs b/src/async_await/join_mod.rs
index 4200c08..d6ddb6d 100644
--- a/src/async_await/join_mod.rs
+++ b/src/async_await/join_mod.rs
@@ -82,11 +82,11 @@ macro_rules! document_join_macro {
}
#[doc(hidden)]
-#[proc_macro_hack(support_nested)]
+#[proc_macro_hack(support_nested, only_hack_old_rustc)]
pub use futures_macro::join_internal;
#[doc(hidden)]
-#[proc_macro_hack(support_nested)]
+#[proc_macro_hack(support_nested, only_hack_old_rustc)]
pub use futures_macro::try_join_internal;
document_join_macro! {
diff --git a/src/async_await/select_mod.rs b/src/async_await/select_mod.rs
index 47eca4d..59bca08 100644
--- a/src/async_await/select_mod.rs
+++ b/src/async_await/select_mod.rs
@@ -310,11 +310,11 @@ macro_rules! document_select_macro {
#[cfg(feature = "std")]
#[doc(hidden)]
-#[proc_macro_hack(support_nested)]
+#[proc_macro_hack(support_nested, only_hack_old_rustc)]
pub use futures_macro::select_internal;
#[doc(hidden)]
-#[proc_macro_hack(support_nested)]
+#[proc_macro_hack(support_nested, only_hack_old_rustc)]
pub use futures_macro::select_biased_internal;
document_select_macro! {
diff --git a/src/compat/compat01as03.rs b/src/compat/compat01as03.rs
index 95025d2..bc3aee3 100644
--- a/src/compat/compat01as03.rs
+++ b/src/compat/compat01as03.rs
@@ -32,8 +32,8 @@ impl<T> Unpin for Compat01As03<T> {}
impl<T> Compat01As03<T> {
/// Wraps a futures 0.1 Future, Stream, AsyncRead, or AsyncWrite
/// object in a futures 0.3-compatible wrapper.
- pub fn new(object: T) -> Compat01As03<T> {
- Compat01As03 {
+ pub fn new(object: T) -> Self {
+ Self {
inner: spawn01(object),
}
}
@@ -197,8 +197,8 @@ impl<S, SinkItem> Unpin for Compat01As03Sink<S, SinkItem> {}
#[cfg(feature = "sink")]
impl<S, SinkItem> Compat01As03Sink<S, SinkItem> {
/// Wraps a futures 0.1 Sink object in a futures 0.3-compatible wrapper.
- pub fn new(inner: S) -> Compat01As03Sink<S, SinkItem> {
- Compat01As03Sink {
+ pub fn new(inner: S) -> Self {
+ Self {
inner: spawn01(inner),
buffer: None,
close_started: false
@@ -344,10 +344,10 @@ struct NotifyWaker(task03::Waker);
struct WakerToHandle<'a>(&'a task03::Waker);
impl From<WakerToHandle<'_>> for NotifyHandle01 {
- fn from(handle: WakerToHandle<'_>) -> NotifyHandle01 {
+ fn from(handle: WakerToHandle<'_>) -> Self {
let ptr = Box::new(NotifyWaker(handle.0.clone()));
- unsafe { NotifyHandle01::new(Box::into_raw(ptr)) }
+ unsafe { Self::new(Box::into_raw(ptr)) }
}
}
diff --git a/src/compat/compat03as01.rs b/src/compat/compat03as01.rs
index 4841c5e..3f1eebb 100644
--- a/src/compat/compat03as01.rs
+++ b/src/compat/compat03as01.rs
@@ -54,8 +54,8 @@ impl<T> Compat<T> {
/// For types which implement appropriate futures `0.3`
/// traits, the result will be a type which implements
/// the corresponding futures 0.1 type.
- pub fn new(inner: T) -> Compat<T> {
- Compat { inner }
+ pub fn new(inner: T) -> Self {
+ Self { inner }
}
/// Get a reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
@@ -80,7 +80,7 @@ impl<T> Compat<T> {
impl<T, Item> CompatSink<T, Item> {
/// Creates a new [`CompatSink`].
pub fn new(inner: T) -> Self {
- CompatSink {
+ Self {
inner,
_phantom: PhantomData,
}
@@ -174,8 +174,8 @@ where
struct Current(task01::Task);
impl Current {
- fn new() -> Current {
- Current(task01::current())
+ fn new() -> Self {
+ Self(task01::current())
}
fn as_waker(&self) -> WakerRef<'_> {
diff --git a/src/compat/mod.rs b/src/compat/mod.rs
index 897a50a..c5edcc5 100644
--- a/src/compat/mod.rs
+++ b/src/compat/mod.rs
@@ -1,4 +1,4 @@
-//! Futures 0.1 / 0.3 shims
+//! Interop between `futures` 0.1 and 0.3.
//!
//! This module is only available when the `compat` feature of this
//! library is activated.
diff --git a/src/fns.rs b/src/fns.rs
index 6908bff..cb62391 100644
--- a/src/fns.rs
+++ b/src/fns.rs
@@ -75,7 +75,7 @@ pub struct OkFn<E>(PhantomData<fn(E)>);
impl<E> Default for OkFn<E> {
fn default() -> Self {
- OkFn(PhantomData)
+ Self(PhantomData)
}
}
@@ -344,7 +344,7 @@ pub struct IntoFn<T>(PhantomData<fn() -> T>);
impl<T> Default for IntoFn<T> {
fn default() -> Self {
- IntoFn(PhantomData)
+ Self(PhantomData)
}
}
impl<A, T> FnOnce1<A> for IntoFn<T> where A: Into<T> {
diff --git a/src/future/abortable.rs b/src/future/abortable.rs
index 3a6b587..1fc75b0 100644
--- a/src/future/abortable.rs
+++ b/src/future/abortable.rs
@@ -5,16 +5,17 @@ use core::fmt;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, Ordering};
use alloc::sync::Arc;
-use pin_project::pin_project;
-
-/// A future which can be remotely short-circuited using an `AbortHandle`.
-#[pin_project]
-#[derive(Debug, Clone)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct Abortable<Fut> {
- #[pin]
- future: Fut,
- inner: Arc<AbortInner>,
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// A future which can be remotely short-circuited using an `AbortHandle`.
+ #[derive(Debug, Clone)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Abortable<Fut> {
+ #[pin]
+ future: Fut,
+ inner: Arc<AbortInner>,
+ }
}
impl<Fut> Abortable<Fut> where Fut: Future {
@@ -38,7 +39,7 @@ impl<Fut> Abortable<Fut> where Fut: Future {
/// # });
/// ```
pub fn new(future: Fut, reg: AbortRegistration) -> Self {
- Abortable {
+ Self {
future,
inner: reg.inner,
}
@@ -84,7 +85,7 @@ impl AbortHandle {
});
(
- AbortHandle {
+ Self {
inner: inner.clone(),
},
AbortRegistration {
diff --git a/src/future/either.rs b/src/future/either.rs
index aa17fa7..a1b9f0a 100644
--- a/src/future/either.rs
+++ b/src/future/either.rs
@@ -4,17 +4,26 @@ use futures_core::future::{FusedFuture, Future};
use futures_core::stream::{FusedStream, Stream};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
/// Combines two different futures, streams, or sinks having the same associated types into a single
/// type.
-#[pin_project(project = EitherProj)]
#[derive(Debug, Clone)]
pub enum Either<A, B> {
/// First branch of the type
- Left(#[pin] A),
+ Left(/* #[pin] */ A),
/// Second branch of the type
- Right(#[pin] B),
+ Right(/* #[pin] */ B),
+}
+
+impl<A, B> Either<A, B> {
+ fn project(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> {
+ unsafe {
+ match self.get_unchecked_mut() {
+ Either::Left(a) => Either::Left(Pin::new_unchecked(a)),
+ Either::Right(b) => Either::Right(Pin::new_unchecked(b)),
+ }
+ }
+ }
}
impl<A, B, T> Either<(T, A), (T, B)> {
@@ -60,8 +69,8 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
- EitherProj::Left(x) => x.poll(cx),
- EitherProj::Right(x) => x.poll(cx),
+ Either::Left(x) => x.poll(cx),
+ Either::Right(x) => x.poll(cx),
}
}
}
@@ -88,8 +97,8 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project() {
- EitherProj::Left(x) => x.poll_next(cx),
- EitherProj::Right(x) => x.poll_next(cx),
+ Either::Left(x) => x.poll_next(cx),
+ Either::Right(x) => x.poll_next(cx),
}
}
}
@@ -117,29 +126,29 @@ where
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
- EitherProj::Left(x) => x.poll_ready(cx),
- EitherProj::Right(x) => x.poll_ready(cx),
+ Either::Left(x) => x.poll_ready(cx),
+ Either::Right(x) => x.poll_ready(cx),
}
}
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
match self.project() {
- EitherProj::Left(x) => x.start_send(item),
- EitherProj::Right(x) => x.start_send(item),
+ Either::Left(x) => x.start_send(item),
+ Either::Right(x) => x.start_send(item),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
- EitherProj::Left(x) => x.poll_flush(cx),
- EitherProj::Right(x) => x.poll_flush(cx),
+ Either::Left(x) => x.poll_flush(cx),
+ Either::Right(x) => x.poll_flush(cx),
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
- EitherProj::Left(x) => x.poll_close(cx),
- EitherProj::Right(x) => x.poll_close(cx),
+ Either::Left(x) => x.poll_close(cx),
+ Either::Right(x) => x.poll_close(cx),
}
}
}
@@ -176,8 +185,8 @@ mod if_std {
buf: &mut [u8],
) -> Poll<Result<usize>> {
match self.project() {
- EitherProj::Left(x) => x.poll_read(cx, buf),
- EitherProj::Right(x) => x.poll_read(cx, buf),
+ Either::Left(x) => x.poll_read(cx, buf),
+ Either::Right(x) => x.poll_read(cx, buf),
}
}
@@ -187,8 +196,8 @@ mod if_std {
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
match self.project() {
- EitherProj::Left(x) => x.poll_read_vectored(cx, bufs),
- EitherProj::Right(x) => x.poll_read_vectored(cx, bufs),
+ Either::Left(x) => x.poll_read_vectored(cx, bufs),
+ Either::Right(x) => x.poll_read_vectored(cx, bufs),
}
}
}
@@ -204,8 +213,8 @@ mod if_std {
buf: &[u8],
) -> Poll<Result<usize>> {
match self.project() {
- EitherProj::Left(x) => x.poll_write(cx, buf),
- EitherProj::Right(x) => x.poll_write(cx, buf),
+ Either::Left(x) => x.poll_write(cx, buf),
+ Either::Right(x) => x.poll_write(cx, buf),
}
}
@@ -215,22 +224,22 @@ mod if_std {
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
match self.project() {
- EitherProj::Left(x) => x.poll_write_vectored(cx, bufs),
- EitherProj::Right(x) => x.poll_write_vectored(cx, bufs),
+ Either::Left(x) => x.poll_write_vectored(cx, bufs),
+ Either::Right(x) => x.poll_write_vectored(cx, bufs),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.project() {
- EitherProj::Left(x) => x.poll_flush(cx),
- EitherProj::Right(x) => x.poll_flush(cx),
+ Either::Left(x) => x.poll_flush(cx),
+ Either::Right(x) => x.poll_flush(cx),
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.project() {
- EitherProj::Left(x) => x.poll_close(cx),
- EitherProj::Right(x) => x.poll_close(cx),
+ Either::Left(x) => x.poll_close(cx),
+ Either::Right(x) => x.poll_close(cx),
}
}
}
@@ -246,8 +255,8 @@ mod if_std {
pos: SeekFrom,
) -> Poll<Result<u64>> {
match self.project() {
- EitherProj::Left(x) => x.poll_seek(cx, pos),
- EitherProj::Right(x) => x.poll_seek(cx, pos),
+ Either::Left(x) => x.poll_seek(cx, pos),
+ Either::Right(x) => x.poll_seek(cx, pos),
}
}
}
@@ -259,15 +268,15 @@ mod if_std {
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
match self.project() {
- EitherProj::Left(x) => x.poll_fill_buf(cx),
- EitherProj::Right(x) => x.poll_fill_buf(cx),
+ Either::Left(x) => x.poll_fill_buf(cx),
+ Either::Right(x) => x.poll_fill_buf(cx),
}
}
fn consume(self: Pin<&mut Self>, amt: usize) {
match self.project() {
- EitherProj::Left(x) => x.consume(amt),
- EitherProj::Right(x) => x.consume(amt),
+ Either::Left(x) => x.consume(amt),
+ Either::Right(x) => x.consume(amt),
}
}
}
diff --git a/src/future/future/catch_unwind.rs b/src/future/future/catch_unwind.rs
index 33839f6..3f16577 100644
--- a/src/future/future/catch_unwind.rs
+++ b/src/future/future/catch_unwind.rs
@@ -4,17 +4,21 @@ use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct CatchUnwind<Fut>(#[pin] Fut);
+pin_project! {
+ /// Future for the [`catch_unwind`](super::FutureExt::catch_unwind) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct CatchUnwind<Fut> {
+ #[pin]
+ future: Fut,
+ }
+}
impl<Fut> CatchUnwind<Fut> where Fut: Future + UnwindSafe {
- pub(super) fn new(future: Fut) -> CatchUnwind<Fut> {
- CatchUnwind(future)
+ pub(super) fn new(future: Fut) -> Self {
+ Self { future }
}
}
@@ -24,7 +28,7 @@ impl<Fut> Future for CatchUnwind<Fut>
type Output = Result<Fut::Output, Box<dyn Any + Send>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let f = self.project().0;
+ let f = self.project().future;
catch_unwind(AssertUnwindSafe(|| f.poll(cx)))?.map(Ok)
}
}
diff --git a/src/future/future/flatten.rs b/src/future/future/flatten.rs
index 53f75e2..0c48a4f 100644
--- a/src/future/future/flatten.rs
+++ b/src/future/future/flatten.rs
@@ -1,22 +1,25 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-#[pin_project(project = FlattenProj)]
-#[derive(Debug)]
-pub enum Flatten<Fut1, Fut2> {
- First(#[pin] Fut1),
- Second(#[pin] Fut2),
- Empty,
+pin_project! {
+ #[project = FlattenProj]
+ #[derive(Debug)]
+ pub enum Flatten<Fut1, Fut2> {
+ First { #[pin] f: Fut1 },
+ Second { #[pin] f: Fut2 },
+ Empty,
+ }
}
impl<Fut1, Fut2> Flatten<Fut1, Fut2> {
pub(crate) fn new(future: Fut1) -> Self {
- Flatten::First(future)
+ Self::First { f: future }
}
}
@@ -26,7 +29,7 @@ impl<Fut> FusedFuture for Flatten<Fut, Fut::Output>
{
fn is_terminated(&self) -> bool {
match self {
- Flatten::Empty => true,
+ Self::Empty => true,
_ => false,
}
}
@@ -41,13 +44,13 @@ impl<Fut> Future for Flatten<Fut, Fut::Output>
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(loop {
match self.as_mut().project() {
- FlattenProj::First(f) => {
+ FlattenProj::First { f } => {
let f = ready!(f.poll(cx));
- self.set(Flatten::Second(f));
+ self.set(Self::Second { f });
},
- FlattenProj::Second(f) => {
+ FlattenProj::Second { f } => {
let output = ready!(f.poll(cx));
- self.set(Flatten::Empty);
+ self.set(Self::Empty);
break output;
},
FlattenProj::Empty => panic!("Flatten polled after completion"),
@@ -62,7 +65,7 @@ impl<Fut> FusedStream for Flatten<Fut, Fut::Output>
{
fn is_terminated(&self) -> bool {
match self {
- Flatten::Empty => true,
+ Self::Empty => true,
_ => false,
}
}
@@ -77,14 +80,14 @@ impl<Fut> Stream for Flatten<Fut, Fut::Output>
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(loop {
match self.as_mut().project() {
- FlattenProj::First(f) => {
+ FlattenProj::First { f } => {
let f = ready!(f.poll(cx));
- self.set(Flatten::Second(f));
+ self.set(Self::Second { f });
},
- FlattenProj::Second(f) => {
+ FlattenProj::Second { f } => {
let output = ready!(f.poll_next(cx));
if output.is_none() {
- self.set(Flatten::Empty);
+ self.set(Self::Empty);
}
break output;
},
@@ -109,11 +112,11 @@ where
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(loop {
match self.as_mut().project() {
- FlattenProj::First(f) => {
+ FlattenProj::First { f } => {
let f = ready!(f.poll(cx));
- self.set(Flatten::Second(f));
+ self.set(Self::Second { f });
},
- FlattenProj::Second(f) => {
+ FlattenProj::Second { f } => {
break ready!(f.poll_ready(cx));
},
FlattenProj::Empty => panic!("poll_ready called after eof"),
@@ -123,16 +126,16 @@ where
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
match self.project() {
- FlattenProj::First(_) => panic!("poll_ready not called first"),
- FlattenProj::Second(f) => f.start_send(item),
+ FlattenProj::First { .. } => panic!("poll_ready not called first"),
+ FlattenProj::Second { f } => f.start_send(item),
FlattenProj::Empty => panic!("start_send called after eof"),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
- FlattenProj::First(_) => Poll::Ready(Ok(())),
- FlattenProj::Second(f) => f.poll_flush(cx),
+ FlattenProj::First { .. } => Poll::Ready(Ok(())),
+ FlattenProj::Second { f } => f.poll_flush(cx),
FlattenProj::Empty => panic!("poll_flush called after eof"),
}
}
@@ -142,11 +145,11 @@ where
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let res = match self.as_mut().project() {
- FlattenProj::Second(f) => f.poll_close(cx),
+ FlattenProj::Second { f } => f.poll_close(cx),
_ => Poll::Ready(Ok(())),
};
if res.is_ready() {
- self.set(Flatten::Empty);
+ self.set(Self::Empty);
}
res
}
diff --git a/src/future/future/fuse.rs b/src/future/future/fuse.rs
index 9f2e1ca..f4284ba 100644
--- a/src/future/future/fuse.rs
+++ b/src/future/future/fuse.rs
@@ -1,17 +1,22 @@
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
+use futures_core::ready;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`fuse`](super::FutureExt::fuse) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct Fuse<Fut>(#[pin] Option<Fut>);
+pin_project! {
+ /// Future for the [`fuse`](super::FutureExt::fuse) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Fuse<Fut> {
+ #[pin]
+ inner: Option<Fut>,
+ }
+}
impl<Fut> Fuse<Fut> {
- pub(super) fn new(f: Fut) -> Fuse<Fut> {
- Fuse(Some(f))
+ pub(super) fn new(f: Fut) -> Self {
+ Self { inner: Some(f) }
}
}
@@ -61,14 +66,14 @@ impl<Fut: Future> Fuse<Fut> {
/// }
/// # });
/// ```
- pub fn terminated() -> Fuse<Fut> {
- Fuse(None)
+ pub fn terminated() -> Self {
+ Self { inner: None }
}
}
impl<Fut: Future> FusedFuture for Fuse<Fut> {
fn is_terminated(&self) -> bool {
- self.0.is_none()
+ self.inner.is_none()
}
}
@@ -76,10 +81,10 @@ impl<Fut: Future> Future for Fuse<Fut> {
type Output = Fut::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
- Poll::Ready(match self.as_mut().project().0.as_pin_mut() {
+ Poll::Ready(match self.as_mut().project().inner.as_pin_mut() {
Some(fut) => {
let output = ready!(fut.poll(cx));
- self.project().0.set(None);
+ self.project().inner.set(None);
output
},
None => return Poll::Pending,
diff --git a/src/future/future/map.rs b/src/future/future/map.rs
index 8e7f636..7471aba 100644
--- a/src/future/future/map.rs
+++ b/src/future/future/map.rs
@@ -1,45 +1,51 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use crate::fns::FnOnce1;
-/// Internal Map future
-#[pin_project(project = MapProj, project_replace = MapProjOwn)]
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub enum Map<Fut, F> {
- Incomplete {
- #[pin]
- future: Fut,
- f: F,
- },
- Complete,
+pin_project! {
+ /// Internal Map future
+ #[project = MapProj]
+ #[project_replace = MapProjReplace]
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub enum Map<Fut, F> {
+ Incomplete {
+ #[pin]
+ future: Fut,
+ f: F,
+ },
+ Complete,
+ }
}
impl<Fut, F> Map<Fut, F> {
/// Creates a new Map.
- pub(crate) fn new(future: Fut, f: F) -> Map<Fut, F> {
- Map::Incomplete { future, f }
+ pub(crate) fn new(future: Fut, f: F) -> Self {
+ Self::Incomplete { future, f }
}
}
impl<Fut, F, T> FusedFuture for Map<Fut, F>
- where Fut: Future,
- F: FnOnce1<Fut::Output, Output=T>,
+where
+ Fut: Future,
+ F: FnOnce1<Fut::Output, Output = T>,
{
fn is_terminated(&self) -> bool {
match self {
- Map::Incomplete { .. } => false,
- Map::Complete => true,
+ Self::Incomplete { .. } => false,
+ Self::Complete => true,
}
}
}
impl<Fut, F, T> Future for Map<Fut, F>
- where Fut: Future,
- F: FnOnce1<Fut::Output, Output=T>,
+where
+ Fut: Future,
+ F: FnOnce1<Fut::Output, Output = T>,
{
type Output = T;
@@ -48,11 +54,13 @@ impl<Fut, F, T> Future for Map<Fut, F>
MapProj::Incomplete { future, .. } => {
let output = ready!(future.poll(cx));
match self.project_replace(Map::Complete) {
- MapProjOwn::Incomplete { f, .. } => Poll::Ready(f.call_once(output)),
- MapProjOwn::Complete => unreachable!(),
+ MapProjReplace::Incomplete { f, .. } => Poll::Ready(f.call_once(output)),
+ MapProjReplace::Complete => unreachable!(),
}
- },
- MapProj::Complete => panic!("Map must not be polled after it returned `Poll::Ready`"),
+ }
+ MapProj::Complete => {
+ panic!("Map must not be polled after it returned `Poll::Ready`")
+ }
}
}
}
diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs
index f5d5dd2..f01d346 100644
--- a/src/future/future/mod.rs
+++ b/src/future/future/mod.rs
@@ -114,7 +114,7 @@ pub use self::remote_handle::{Remote, RemoteHandle};
mod shared;
#[cfg(feature = "std")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::shared::Shared;
+pub use self::shared::{Shared, WeakShared};
impl<T: ?Sized> FutureExt for T where T: Future {}
diff --git a/src/future/future/remote_handle.rs b/src/future/future/remote_handle.rs
index 598f63c..0d33ea5 100644
--- a/src/future/future/remote_handle.rs
+++ b/src/future/future/remote_handle.rs
@@ -4,6 +4,7 @@ use {
futures_core::{
future::Future,
task::{Context, Poll},
+ ready,
},
std::{
any::Any,
@@ -16,7 +17,7 @@ use {
},
thread,
},
- pin_project::pin_project,
+ pin_project_lite::pin_project,
};
/// The handle to a remote future returned by
@@ -69,16 +70,17 @@ impl<T: 'static> Future for RemoteHandle<T> {
type SendMsg<Fut> = Result<<Fut as Future>::Output, Box<(dyn Any + Send + 'static)>>;
-/// A future which sends its output to the corresponding `RemoteHandle`.
-/// Created by [`remote_handle`](crate::future::FutureExt::remote_handle).
-#[pin_project]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
-pub struct Remote<Fut: Future> {
- tx: Option<Sender<SendMsg<Fut>>>,
- keep_running: Arc<AtomicBool>,
- #[pin]
- future: CatchUnwind<AssertUnwindSafe<Fut>>,
+pin_project! {
+ /// A future which sends its output to the corresponding `RemoteHandle`.
+ /// Created by [`remote_handle`](crate::future::FutureExt::remote_handle).
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
+ pub struct Remote<Fut: Future> {
+ tx: Option<Sender<SendMsg<Fut>>>,
+ keep_running: Arc<AtomicBool>,
+ #[pin]
+ future: CatchUnwind<AssertUnwindSafe<Fut>>,
+ }
}
impl<Fut: Future + fmt::Debug> fmt::Debug for Remote<Fut> {
@@ -95,11 +97,11 @@ impl<Fut: Future> Future for Remote<Fut> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let this = self.project();
- if let Poll::Ready(_) = this.tx.as_mut().unwrap().poll_canceled(cx) {
- if !this.keep_running.load(Ordering::SeqCst) {
- // Cancelled, bail out
- return Poll::Ready(())
- }
+ if this.tx.as_mut().unwrap().poll_canceled(cx).is_ready()
+ && !this.keep_running.load(Ordering::SeqCst)
+ {
+ // Cancelled, bail out
+ return Poll::Ready(());
}
let output = ready!(this.future.poll(cx));
diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs
index 79ad5c3..53635b5 100644
--- a/src/future/future/shared.rs
+++ b/src/future/future/shared.rs
@@ -7,7 +7,7 @@ use std::fmt;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, SeqCst};
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, Mutex, Weak};
/// Future for the [`shared`](super::FutureExt::shared) method.
#[must_use = "futures do nothing unless you `.await` or poll them"]
@@ -26,6 +26,9 @@ struct Notifier {
wakers: Mutex<Option<Slab<Option<Waker>>>>,
}
+/// A weak reference to a [`Shared`] that can be upgraded much like an `Arc`.
+pub struct WeakShared<Fut: Future>(Weak<Inner<Fut>>);
+
// The future itself is polled behind the `Arc`, so it won't be moved
// when `Shared` is moved.
impl<Fut: Future> Unpin for Shared<Fut> {}
@@ -45,6 +48,12 @@ impl<Fut: Future> fmt::Debug for Inner<Fut> {
}
}
+impl<Fut: Future> fmt::Debug for WeakShared<Fut> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("WeakShared").finish()
+ }
+}
+
enum FutureOrOutput<Fut: Future> {
Future(Fut),
Output(Fut::Output),
@@ -72,7 +81,7 @@ const POISONED: usize = 3;
const NULL_WAKER_KEY: usize = usize::max_value();
impl<Fut: Future> Shared<Fut> {
- pub(super) fn new(future: Fut) -> Shared<Fut> {
+ pub(super) fn new(future: Fut) -> Self {
let inner = Inner {
future_or_output: UnsafeCell::new(FutureOrOutput::Future(future)),
notifier: Arc::new(Notifier {
@@ -81,7 +90,7 @@ impl<Fut: Future> Shared<Fut> {
}),
};
- Shared {
+ Self {
inner: Some(Arc::new(inner)),
waker_key: NULL_WAKER_KEY,
}
@@ -107,6 +116,16 @@ where
}
None
}
+
+ /// Creates a new [`WeakShared`] for this [`Shared`].
+ ///
+ /// Returns [`None`] if it has already been polled to completion.
+ pub fn downgrade(&self) -> Option<WeakShared<Fut>> {
+ if let Some(inner) = self.inner.as_ref() {
+ return Some(WeakShared(Arc::downgrade(inner)));
+ }
+ None
+ }
}
impl<Fut> Inner<Fut>
@@ -118,7 +137,7 @@ where
/// is `COMPLETE`
unsafe fn output(&self) -> &Fut::Output {
match &*self.future_or_output.get() {
- FutureOrOutput::Output(ref item) => &item,
+ FutureOrOutput::Output(ref item) => item,
FutureOrOutput::Future(_) => unreachable!(),
}
}
@@ -191,7 +210,12 @@ where
inner.record_waker(&mut this.waker_key, cx);
- match inner.notifier.state.compare_and_swap(IDLE, POLLING, SeqCst) {
+ match inner
+ .notifier
+ .state
+ .compare_exchange(IDLE, POLLING, SeqCst, SeqCst)
+ .unwrap_or_else(|x| x)
+ {
IDLE => {
// Lock acquired, fall through
}
@@ -236,14 +260,18 @@ where
match future.poll(&mut cx) {
Poll::Pending => {
- match inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) {
- POLLING => {
- // Success
- drop(_reset);
- this.inner = Some(inner);
- return Poll::Pending;
- }
- _ => unreachable!(),
+ if inner
+ .notifier
+ .state
+ .compare_exchange(POLLING, IDLE, SeqCst, SeqCst)
+ .is_ok()
+ {
+ // Success
+ drop(_reset);
+ this.inner = Some(inner);
+ return Poll::Pending;
+ } else {
+ unreachable!()
}
}
Poll::Ready(output) => output,
@@ -278,7 +306,7 @@ where
Fut: Future,
{
fn clone(&self) -> Self {
- Shared {
+ Self {
inner: self.inner.clone(),
waker_key: NULL_WAKER_KEY,
}
@@ -314,3 +342,17 @@ impl ArcWake for Notifier {
}
}
}
+
+impl<Fut: Future> WeakShared<Fut>
+{
+ /// Attempts to upgrade this [`WeakShared`] into a [`Shared`].
+ ///
+ /// Returns [`None`] if all clones of the [`Shared`] have been dropped or polled
+ /// to completion.
+ pub fn upgrade(&self) -> Option<Shared<Fut>> {
+ Some(Shared {
+ inner: Some(self.0.upgrade()?),
+ waker_key: NULL_WAKER_KEY,
+ })
+ }
+}
diff --git a/src/future/join.rs b/src/future/join.rs
index 363e119..cfe53a7 100644
--- a/src/future/join.rs
+++ b/src/future/join.rs
@@ -5,7 +5,7 @@ use core::fmt;
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use super::assert_future;
@@ -14,11 +14,12 @@ macro_rules! generate {
$(#[$doc:meta])*
($Join:ident, <$($Fut:ident),*>),
)*) => ($(
- $(#[$doc])*
- #[pin_project]
- #[must_use = "futures do nothing unless you `.await` or poll them"]
- pub struct $Join<$($Fut: Future),*> {
- $(#[pin] $Fut: MaybeDone<$Fut>,)*
+ pin_project! {
+ $(#[$doc])*
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct $Join<$($Fut: Future),*> {
+ $(#[pin] $Fut: MaybeDone<$Fut>,)*
+ }
}
impl<$($Fut),*> fmt::Debug for $Join<$($Fut),*>
@@ -36,8 +37,8 @@ macro_rules! generate {
}
impl<$($Fut: Future),*> $Join<$($Fut),*> {
- fn new($($Fut: $Fut),*) -> $Join<$($Fut),*> {
- $Join {
+ fn new($($Fut: $Fut),*) -> Self {
+ Self {
$($Fut: maybe_done($Fut)),*
}
}
diff --git a/src/future/maybe_done.rs b/src/future/maybe_done.rs
index 5120a9b..bb5579e 100644
--- a/src/future/maybe_done.rs
+++ b/src/future/maybe_done.rs
@@ -1,18 +1,18 @@
//! Definition of the MaybeDone combinator
+use core::mem;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
/// A future that may have completed.
///
/// This is created by the [`maybe_done()`] function.
-#[pin_project(project = MaybeDoneProj, project_replace = MaybeDoneProjOwn)]
#[derive(Debug)]
pub enum MaybeDone<Fut: Future> {
/// A not-yet-completed future
- Future(#[pin] Fut),
+ Future(/* #[pin] */ Fut),
/// The output of the completed future
Done(Fut::Output),
/// The empty variant after the result of a [`MaybeDone`] has been
@@ -20,6 +20,8 @@ pub enum MaybeDone<Fut: Future> {
Gone,
}
+impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {}
+
/// Wraps a future into a `MaybeDone`
///
/// # Examples
@@ -48,9 +50,11 @@ impl<Fut: Future> MaybeDone<Fut> {
/// has not yet been called.
#[inline]
pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> {
- match self.project() {
- MaybeDoneProj::Done(res) => Some(res),
- _ => None,
+ unsafe {
+ match self.get_unchecked_mut() {
+ MaybeDone::Done(res) => Some(res),
+ _ => None,
+ }
}
}
@@ -59,12 +63,14 @@ impl<Fut: Future> MaybeDone<Fut> {
#[inline]
pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
match &*self {
- MaybeDone::Done(_) => {}
- MaybeDone::Future(_) | MaybeDone::Gone => return None,
+ Self::Done(_) => {}
+ Self::Future(_) | Self::Gone => return None,
}
- match self.project_replace(MaybeDone::Gone) {
- MaybeDoneProjOwn::Done(output) => Some(output),
- _ => unreachable!(),
+ unsafe {
+ match mem::replace(self.get_unchecked_mut(), Self::Gone) {
+ MaybeDone::Done(output) => Some(output),
+ _ => unreachable!(),
+ }
}
}
}
@@ -72,8 +78,8 @@ impl<Fut: Future> MaybeDone<Fut> {
impl<Fut: Future> FusedFuture for MaybeDone<Fut> {
fn is_terminated(&self) -> bool {
match self {
- MaybeDone::Future(_) => false,
- MaybeDone::Done(_) | MaybeDone::Gone => true,
+ Self::Future(_) => false,
+ Self::Done(_) | Self::Gone => true,
}
}
}
@@ -82,13 +88,15 @@ impl<Fut: Future> Future for MaybeDone<Fut> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match self.as_mut().project() {
- MaybeDoneProj::Future(f) => {
- let res = ready!(f.poll(cx));
- self.set(MaybeDone::Done(res));
+ unsafe {
+ match self.as_mut().get_unchecked_mut() {
+ MaybeDone::Future(f) => {
+ let res = ready!(Pin::new_unchecked(f).poll(cx));
+ self.set(Self::Done(res));
+ }
+ MaybeDone::Done(_) => {}
+ MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
}
- MaybeDoneProj::Done(_) => {}
- MaybeDoneProj::Gone => panic!("MaybeDone polled after value taken"),
}
Poll::Ready(())
}
diff --git a/src/future/mod.rs b/src/future/mod.rs
index 3f19c19..ab29823 100644
--- a/src/future/mod.rs
+++ b/src/future/mod.rs
@@ -1,8 +1,13 @@
-//! Futures
+//! Asynchronous values.
//!
-//! This module contains a number of functions for working with `Future`s,
-//! including the [`FutureExt`] trait and the [`TryFutureExt`] trait which add
-//! methods to `Future` types.
+//! This module contains:
+//!
+//! - The [`Future`] trait.
+//! - The [`FutureExt`] and [`TryFutureExt`] trait, which provides adapters for
+//! chaining and composing futures.
+//! - Top-level future combinators like [`lazy`](lazy()) which creates a future
+//! from a closure that defines its return value, and [`ready`](ready()),
+//! which constructs a future with an immediate defined value.
#[cfg(feature = "alloc")]
pub use futures_core::future::{BoxFuture, LocalBoxFuture};
@@ -28,7 +33,7 @@ pub use self::future::CatchUnwind;
pub use self::future::{Remote, RemoteHandle};
#[cfg(feature = "std")]
-pub use self::future::Shared;
+pub use self::future::{Shared, WeakShared};
mod try_future;
pub use self::try_future::{
diff --git a/src/future/option.rs b/src/future/option.rs
index 88be009..85939d6 100644
--- a/src/future/option.rs
+++ b/src/future/option.rs
@@ -3,29 +3,33 @@
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// A future representing a value which may or may not be present.
-///
-/// Created by the [`From`] implementation for [`Option`](std::option::Option).
-///
-/// # Examples
-///
-/// ```
-/// # futures::executor::block_on(async {
-/// use futures::future::OptionFuture;
-///
-/// let mut a: OptionFuture<_> = Some(async { 123 }).into();
-/// assert_eq!(a.await, Some(123));
-///
-/// a = None.into();
-/// assert_eq!(a.await, None);
-/// # });
-/// ```
-#[pin_project]
-#[derive(Debug, Clone)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct OptionFuture<F>(#[pin] Option<F>);
+pin_project! {
+ /// A future representing a value which may or may not be present.
+ ///
+ /// Created by the [`From`] implementation for [`Option`](std::option::Option).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::future::OptionFuture;
+ ///
+ /// let mut a: OptionFuture<_> = Some(async { 123 }).into();
+ /// assert_eq!(a.await, Some(123));
+ ///
+ /// a = None.into();
+ /// assert_eq!(a.await, None);
+ /// # });
+ /// ```
+ #[derive(Debug, Clone)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct OptionFuture<F> {
+ #[pin]
+ inner: Option<F>,
+ }
+}
impl<F: Future> Future for OptionFuture<F> {
type Output = Option<F::Output>;
@@ -34,7 +38,7 @@ impl<F: Future> Future for OptionFuture<F> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
- match self.project().0.as_pin_mut() {
+ match self.project().inner.as_pin_mut() {
Some(x) => x.poll(cx).map(Some),
None => Poll::Ready(None),
}
@@ -43,7 +47,7 @@ impl<F: Future> Future for OptionFuture<F> {
impl<F: FusedFuture> FusedFuture for OptionFuture<F> {
fn is_terminated(&self) -> bool {
- match &self.0 {
+ match &self.inner {
Some(x) => x.is_terminated(),
None => true,
}
@@ -52,6 +56,6 @@ impl<F: FusedFuture> FusedFuture for OptionFuture<F> {
impl<T> From<Option<T>> for OptionFuture<T> {
fn from(option: Option<T>) -> Self {
- OptionFuture(option)
+ Self { inner: option }
}
}
diff --git a/src/future/try_future/into_future.rs b/src/future/try_future/into_future.rs
index 240bb1b..e88d603 100644
--- a/src/future/try_future/into_future.rs
+++ b/src/future/try_future/into_future.rs
@@ -1,23 +1,27 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`into_future`](super::TryFutureExt::into_future) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct IntoFuture<Fut>(#[pin] Fut);
+pin_project! {
+ /// Future for the [`into_future`](super::TryFutureExt::into_future) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct IntoFuture<Fut> {
+ #[pin]
+ future: Fut,
+ }
+}
impl<Fut> IntoFuture<Fut> {
#[inline]
- pub(crate) fn new(future: Fut) -> IntoFuture<Fut> {
- IntoFuture(future)
+ pub(crate) fn new(future: Fut) -> Self {
+ Self { future }
}
}
impl<Fut: TryFuture + FusedFuture> FusedFuture for IntoFuture<Fut> {
- fn is_terminated(&self) -> bool { self.0.is_terminated() }
+ fn is_terminated(&self) -> bool { self.future.is_terminated() }
}
impl<Fut: TryFuture> Future for IntoFuture<Fut> {
@@ -28,6 +32,6 @@ impl<Fut: TryFuture> Future for IntoFuture<Fut> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
- self.project().0.try_poll(cx)
+ self.project().future.try_poll(cx)
}
}
diff --git a/src/future/try_future/try_flatten.rs b/src/future/try_future/try_flatten.rs
index 2bcadc5..5241b27 100644
--- a/src/future/try_future/try_flatten.rs
+++ b/src/future/try_future/try_flatten.rs
@@ -1,22 +1,25 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream, TryStream};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-#[pin_project(project = TryFlattenProj)]
-#[derive(Debug)]
-pub enum TryFlatten<Fut1, Fut2> {
- First(#[pin] Fut1),
- Second(#[pin] Fut2),
- Empty,
+pin_project! {
+ #[project = TryFlattenProj]
+ #[derive(Debug)]
+ pub enum TryFlatten<Fut1, Fut2> {
+ First { #[pin] f: Fut1 },
+ Second { #[pin] f: Fut2 },
+ Empty,
+ }
}
impl<Fut1, Fut2> TryFlatten<Fut1, Fut2> {
pub(crate) fn new(future: Fut1) -> Self {
- TryFlatten::First(future)
+ Self::First { f: future }
}
}
@@ -26,7 +29,7 @@ impl<Fut> FusedFuture for TryFlatten<Fut, Fut::Ok>
{
fn is_terminated(&self) -> bool {
match self {
- TryFlatten::Empty => true,
+ Self::Empty => true,
_ => false,
}
}
@@ -41,18 +44,18 @@ impl<Fut> Future for TryFlatten<Fut, Fut::Ok>
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(loop {
match self.as_mut().project() {
- TryFlattenProj::First(f) => {
+ TryFlattenProj::First { f } => {
match ready!(f.try_poll(cx)) {
- Ok(f) => self.set(TryFlatten::Second(f)),
+ Ok(f) => self.set(Self::Second { f }),
Err(e) => {
- self.set(TryFlatten::Empty);
+ self.set(Self::Empty);
break Err(e);
}
}
},
- TryFlattenProj::Second(f) => {
+ TryFlattenProj::Second { f } => {
let output = ready!(f.try_poll(cx));
- self.set(TryFlatten::Empty);
+ self.set(Self::Empty);
break output;
},
TryFlattenProj::Empty => panic!("TryFlatten polled after completion"),
@@ -67,7 +70,7 @@ impl<Fut> FusedStream for TryFlatten<Fut, Fut::Ok>
{
fn is_terminated(&self) -> bool {
match self {
- TryFlatten::Empty => true,
+ Self::Empty => true,
_ => false,
}
}
@@ -82,19 +85,19 @@ impl<Fut> Stream for TryFlatten<Fut, Fut::Ok>
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(loop {
match self.as_mut().project() {
- TryFlattenProj::First(f) => {
+ TryFlattenProj::First { f } => {
match ready!(f.try_poll(cx)) {
- Ok(f) => self.set(TryFlatten::Second(f)),
+ Ok(f) => self.set(Self::Second { f }),
Err(e) => {
- self.set(TryFlatten::Empty);
+ self.set(Self::Empty);
break Some(Err(e));
}
}
},
- TryFlattenProj::Second(f) => {
+ TryFlattenProj::Second { f } => {
let output = ready!(f.try_poll_next(cx));
if output.is_none() {
- self.set(TryFlatten::Empty);
+ self.set(Self::Empty);
}
break output;
},
@@ -119,16 +122,16 @@ where
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(loop {
match self.as_mut().project() {
- TryFlattenProj::First(f) => {
+ TryFlattenProj::First { f } => {
match ready!(f.try_poll(cx)) {
- Ok(f) => self.set(TryFlatten::Second(f)),
+ Ok(f) => self.set(Self::Second { f }),
Err(e) => {
- self.set(TryFlatten::Empty);
+ self.set(Self::Empty);
break Err(e);
}
}
},
- TryFlattenProj::Second(f) => {
+ TryFlattenProj::Second { f } => {
break ready!(f.poll_ready(cx));
},
TryFlattenProj::Empty => panic!("poll_ready called after eof"),
@@ -138,16 +141,16 @@ where
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
match self.project() {
- TryFlattenProj::First(_) => panic!("poll_ready not called first"),
- TryFlattenProj::Second(f) => f.start_send(item),
+ TryFlattenProj::First { .. } => panic!("poll_ready not called first"),
+ TryFlattenProj::Second { f } => f.start_send(item),
TryFlattenProj::Empty => panic!("start_send called after eof"),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
- TryFlattenProj::First(_) => Poll::Ready(Ok(())),
- TryFlattenProj::Second(f) => f.poll_flush(cx),
+ TryFlattenProj::First { .. } => Poll::Ready(Ok(())),
+ TryFlattenProj::Second { f } => f.poll_flush(cx),
TryFlattenProj::Empty => panic!("poll_flush called after eof"),
}
}
@@ -157,11 +160,11 @@ where
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let res = match self.as_mut().project() {
- TryFlattenProj::Second(f) => f.poll_close(cx),
+ TryFlattenProj::Second { f } => f.poll_close(cx),
_ => Poll::Ready(Ok(())),
};
if res.is_ready() {
- self.set(TryFlatten::Empty);
+ self.set(Self::Empty);
}
res
}
diff --git a/src/future/try_future/try_flatten_err.rs b/src/future/try_future/try_flatten_err.rs
index 480f8c3..2e67f11 100644
--- a/src/future/try_future/try_flatten_err.rs
+++ b/src/future/try_future/try_flatten_err.rs
@@ -1,19 +1,22 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::ready;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-#[pin_project(project = TryFlattenErrProj)]
-#[derive(Debug)]
-pub enum TryFlattenErr<Fut1, Fut2> {
- First(#[pin] Fut1),
- Second(#[pin] Fut2),
- Empty,
+pin_project! {
+ #[project = TryFlattenErrProj]
+ #[derive(Debug)]
+ pub enum TryFlattenErr<Fut1, Fut2> {
+ First { #[pin] f: Fut1 },
+ Second { #[pin] f: Fut2 },
+ Empty,
+ }
}
impl<Fut1, Fut2> TryFlattenErr<Fut1, Fut2> {
pub(crate) fn new(future: Fut1) -> Self {
- TryFlattenErr::First(future)
+ Self::First { f: future }
}
}
@@ -23,7 +26,7 @@ impl<Fut> FusedFuture for TryFlattenErr<Fut, Fut::Error>
{
fn is_terminated(&self) -> bool {
match self {
- TryFlattenErr::Empty => true,
+ Self::Empty => true,
_ => false,
}
}
@@ -38,18 +41,18 @@ impl<Fut> Future for TryFlattenErr<Fut, Fut::Error>
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(loop {
match self.as_mut().project() {
- TryFlattenErrProj::First(f) => {
+ TryFlattenErrProj::First { f } => {
match ready!(f.try_poll(cx)) {
- Err(f) => self.set(TryFlattenErr::Second(f)),
+ Err(f) => self.set(Self::Second { f }),
Ok(e) => {
- self.set(TryFlattenErr::Empty);
+ self.set(Self::Empty);
break Ok(e);
}
}
},
- TryFlattenErrProj::Second(f) => {
+ TryFlattenErrProj::Second { f } => {
let output = ready!(f.try_poll(cx));
- self.set(TryFlattenErr::Empty);
+ self.set(Self::Empty);
break output;
},
TryFlattenErrProj::Empty => panic!("TryFlattenErr polled after completion"),
diff --git a/src/future/try_join.rs b/src/future/try_join.rs
index b4a3b98..25ccdde 100644
--- a/src/future/try_join.rs
+++ b/src/future/try_join.rs
@@ -5,19 +5,20 @@ use core::fmt;
use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
macro_rules! generate {
($(
$(#[$doc:meta])*
($Join:ident, <Fut1, $($Fut:ident),*>),
)*) => ($(
- $(#[$doc])*
- #[pin_project]
- #[must_use = "futures do nothing unless you `.await` or poll them"]
- pub struct $Join<Fut1: TryFuture, $($Fut: TryFuture),*> {
- #[pin] Fut1: TryMaybeDone<Fut1>,
- $(#[pin] $Fut: TryMaybeDone<$Fut>,)*
+ pin_project! {
+ $(#[$doc])*
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct $Join<Fut1: TryFuture, $($Fut: TryFuture),*> {
+ #[pin] Fut1: TryMaybeDone<Fut1>,
+ $(#[pin] $Fut: TryMaybeDone<$Fut>,)*
+ }
}
impl<Fut1, $($Fut),*> fmt::Debug for $Join<Fut1, $($Fut),*>
@@ -46,8 +47,8 @@ macro_rules! generate {
$Fut: TryFuture<Error=Fut1::Error>
),*
{
- fn new(Fut1: Fut1, $($Fut: $Fut),*) -> $Join<Fut1, $($Fut),*> {
- $Join {
+ fn new(Fut1: Fut1, $($Fut: $Fut),*) -> Self {
+ Self {
Fut1: try_maybe_done(Fut1),
$($Fut: try_maybe_done($Fut)),*
}
diff --git a/src/future/try_maybe_done.rs b/src/future/try_maybe_done.rs
index b38b038..90067e9 100644
--- a/src/future/try_maybe_done.rs
+++ b/src/future/try_maybe_done.rs
@@ -1,18 +1,18 @@
//! Definition of the TryMaybeDone combinator
+use core::mem;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::ready;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
/// A future that may have completed with an error.
///
/// This is created by the [`try_maybe_done()`] function.
-#[pin_project(project = TryMaybeDoneProj, project_replace = TryMaybeDoneProjOwn)]
#[derive(Debug)]
pub enum TryMaybeDone<Fut: TryFuture> {
/// A not-yet-completed future
- Future(#[pin] Fut),
+ Future(/* #[pin] */ Fut),
/// The output of the completed future
Done(Fut::Ok),
/// The empty variant after the result of a [`TryMaybeDone`] has been
@@ -21,6 +21,8 @@ pub enum TryMaybeDone<Fut: TryFuture> {
Gone,
}
+impl<Fut: TryFuture + Unpin> Unpin for TryMaybeDone<Fut> {}
+
/// Wraps a future into a `TryMaybeDone`
pub fn try_maybe_done<Fut: TryFuture>(future: Fut) -> TryMaybeDone<Fut> {
TryMaybeDone::Future(future)
@@ -33,9 +35,11 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> {
/// has not yet been called.
#[inline]
pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Ok> {
- match self.project() {
- TryMaybeDoneProj::Done(res) => Some(res),
- _ => None,
+ unsafe {
+ match self.get_unchecked_mut() {
+ TryMaybeDone::Done(res) => Some(res),
+ _ => None,
+ }
}
}
@@ -44,12 +48,14 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> {
#[inline]
pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Ok> {
match &*self {
- TryMaybeDone::Done(_) => {},
- TryMaybeDone::Future(_) | TryMaybeDone::Gone => return None,
+ Self::Done(_) => {},
+ Self::Future(_) | Self::Gone => return None,
}
- match self.project_replace(TryMaybeDone::Gone) {
- TryMaybeDoneProjOwn::Done(output) => Some(output),
- _ => unreachable!()
+ unsafe {
+ match mem::replace(self.get_unchecked_mut(), Self::Gone) {
+ TryMaybeDone::Done(output) => Some(output),
+ _ => unreachable!()
+ }
}
}
}
@@ -57,8 +63,8 @@ impl<Fut: TryFuture> TryMaybeDone<Fut> {
impl<Fut: TryFuture> FusedFuture for TryMaybeDone<Fut> {
fn is_terminated(&self) -> bool {
match self {
- TryMaybeDone::Future(_) => false,
- TryMaybeDone::Done(_) | TryMaybeDone::Gone => true,
+ Self::Future(_) => false,
+ Self::Done(_) | Self::Gone => true,
}
}
}
@@ -67,18 +73,20 @@ impl<Fut: TryFuture> Future for TryMaybeDone<Fut> {
type Output = Result<(), Fut::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match self.as_mut().project() {
- TryMaybeDoneProj::Future(f) => {
- match ready!(f.try_poll(cx)) {
- Ok(res) => self.set(TryMaybeDone::Done(res)),
- Err(e) => {
- self.set(TryMaybeDone::Gone);
- return Poll::Ready(Err(e));
+ unsafe {
+ match self.as_mut().get_unchecked_mut() {
+ TryMaybeDone::Future(f) => {
+ match ready!(Pin::new_unchecked(f).try_poll(cx)) {
+ Ok(res) => self.set(Self::Done(res)),
+ Err(e) => {
+ self.set(Self::Gone);
+ return Poll::Ready(Err(e));
+ }
}
- }
- },
- TryMaybeDoneProj::Done(_) => {},
- TryMaybeDoneProj::Gone => panic!("TryMaybeDone polled after value taken"),
+ },
+ TryMaybeDone::Done(_) => {},
+ TryMaybeDone::Gone => panic!("TryMaybeDone polled after value taken"),
+ }
}
Poll::Ready(Ok(()))
}
diff --git a/src/io/allow_std.rs b/src/io/allow_std.rs
index 346e9ba..9aa8eb4 100644
--- a/src/io/allow_std.rs
+++ b/src/io/allow_std.rs
@@ -41,7 +41,7 @@ macro_rules! try_with_interrupt {
impl<T> AllowStdIo<T> {
/// Creates a new `AllowStdIo` from an existing IO object.
pub fn new(io: T) -> Self {
- AllowStdIo(io)
+ Self(io)
}
/// Returns a reference to the contained IO object.
diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs
index 2755667..270a086 100644
--- a/src/io/buf_reader.rs
+++ b/src/io/buf_reader.rs
@@ -1,39 +1,41 @@
+use futures_core::ready;
use futures_core::task::{Context, Poll};
#[cfg(feature = "read-initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use std::io::{self, Read};
use std::pin::Pin;
use std::{cmp, fmt};
use super::DEFAULT_BUF_SIZE;
-/// The `BufReader` struct adds buffering to any reader.
-///
-/// It can be excessively inefficient to work directly with a [`AsyncRead`]
-/// instance. A `BufReader` performs large, infrequent reads on the underlying
-/// [`AsyncRead`] and maintains an in-memory buffer of the results.
-///
-/// `BufReader` can improve the speed of programs that make *small* and
-/// *repeated* read calls to the same file or network socket. It does not
-/// help when reading very large amounts at once, or reading just one or a few
-/// times. It also provides no advantage when reading from a source that is
-/// already in memory, like a `Vec<u8>`.
-///
-/// When the `BufReader` is dropped, the contents of its buffer will be
-/// discarded. Creating multiple instances of a `BufReader` on the same
-/// stream can cause data loss.
-///
-/// [`AsyncRead`]: futures_io::AsyncRead
-///
-// TODO: Examples
-#[pin_project]
-pub struct BufReader<R> {
- #[pin]
- inner: R,
- buffer: Box<[u8]>,
- pos: usize,
- cap: usize,
+pin_project! {
+ /// The `BufReader` struct adds buffering to any reader.
+ ///
+ /// It can be excessively inefficient to work directly with a [`AsyncRead`]
+ /// instance. A `BufReader` performs large, infrequent reads on the underlying
+ /// [`AsyncRead`] and maintains an in-memory buffer of the results.
+ ///
+ /// `BufReader` can improve the speed of programs that make *small* and
+ /// *repeated* read calls to the same file or network socket. It does not
+ /// help when reading very large amounts at once, or reading just one or a few
+ /// times. It also provides no advantage when reading from a source that is
+ /// already in memory, like a `Vec<u8>`.
+ ///
+ /// When the `BufReader` is dropped, the contents of its buffer will be
+ /// discarded. Creating multiple instances of a `BufReader` on the same
+ /// stream can cause data loss.
+ ///
+ /// [`AsyncRead`]: futures_io::AsyncRead
+ ///
+ // TODO: Examples
+ pub struct BufReader<R> {
+ #[pin]
+ inner: R,
+ buffer: Box<[u8]>,
+ pos: usize,
+ cap: usize,
+ }
}
impl<R: AsyncRead> BufReader<R> {
diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs
index ed9196d..991a365 100644
--- a/src/io/buf_writer.rs
+++ b/src/io/buf_writer.rs
@@ -1,38 +1,40 @@
+use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, SeekFrom};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use std::fmt;
use std::io::{self, Write};
use std::pin::Pin;
use super::DEFAULT_BUF_SIZE;
-/// Wraps a writer and buffers its output.
-///
-/// It can be excessively inefficient to work directly with something that
-/// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and
-/// writes it to an underlying writer in large, infrequent batches.
-///
-/// `BufWriter` can improve the speed of programs that make *small* and
-/// *repeated* write calls to the same file or network socket. It does not
-/// help when writing very large amounts at once, or writing just one or a few
-/// times. It also provides no advantage when writing to a destination that is
-/// in memory, like a `Vec<u8>`.
-///
-/// When the `BufWriter` is dropped, the contents of its buffer will be
-/// discarded. Creating multiple instances of a `BufWriter` on the same
-/// stream can cause data loss. If you need to write out the contents of its
-/// buffer, you must manually call flush before the writer is dropped.
-///
-/// [`AsyncWrite`]: futures_io::AsyncWrite
-/// [`flush`]: super::AsyncWriteExt::flush
-///
-// TODO: Examples
-#[pin_project]
-pub struct BufWriter<W> {
- #[pin]
- inner: W,
- buf: Vec<u8>,
- written: usize,
+pin_project! {
+ /// Wraps a writer and buffers its output.
+ ///
+ /// It can be excessively inefficient to work directly with something that
+ /// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and
+ /// writes it to an underlying writer in large, infrequent batches.
+ ///
+ /// `BufWriter` can improve the speed of programs that make *small* and
+ /// *repeated* write calls to the same file or network socket. It does not
+ /// help when writing very large amounts at once, or writing just one or a few
+ /// times. It also provides no advantage when writing to a destination that is
+ /// in memory, like a `Vec<u8>`.
+ ///
+ /// When the `BufWriter` is dropped, the contents of its buffer will be
+ /// discarded. Creating multiple instances of a `BufWriter` on the same
+ /// stream can cause data loss. If you need to write out the contents of its
+ /// buffer, you must manually call flush before the writer is dropped.
+ ///
+ /// [`AsyncWrite`]: futures_io::AsyncWrite
+ /// [`flush`]: super::AsyncWriteExt::flush
+ ///
+ // TODO: Examples
+ pub struct BufWriter<W> {
+ #[pin]
+ inner: W,
+ buf: Vec<u8>,
+ written: usize,
+ }
}
impl<W: AsyncWrite> BufWriter<W> {
diff --git a/src/io/chain.rs b/src/io/chain.rs
index 336307f..1b6a335 100644
--- a/src/io/chain.rs
+++ b/src/io/chain.rs
@@ -1,21 +1,23 @@
+use futures_core::ready;
use futures_core::task::{Context, Poll};
#[cfg(feature = "read-initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use std::fmt;
use std::io;
use std::pin::Pin;
-/// Reader for the [`chain`](super::AsyncReadExt::chain) method.
-#[pin_project]
-#[must_use = "readers do nothing unless polled"]
-pub struct Chain<T, U> {
- #[pin]
- first: T,
- #[pin]
- second: U,
- done_first: bool,
+pin_project! {
+ /// Reader for the [`chain`](super::AsyncReadExt::chain) method.
+ #[must_use = "readers do nothing unless polled"]
+ pub struct Chain<T, U> {
+ #[pin]
+ first: T,
+ #[pin]
+ second: U,
+ done_first: bool,
+ }
}
impl<T, U> Chain<T, U>
diff --git a/src/io/close.rs b/src/io/close.rs
index 4d56696..b944592 100644
--- a/src/io/close.rs
+++ b/src/io/close.rs
@@ -15,7 +15,7 @@ impl<W: ?Sized + Unpin> Unpin for Close<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> Close<'a, W> {
pub(super) fn new(writer: &'a mut W) -> Self {
- Close { writer }
+ Self { writer }
}
}
diff --git a/src/io/copy.rs b/src/io/copy.rs
index 491a680..bc59255 100644
--- a/src/io/copy.rs
+++ b/src/io/copy.rs
@@ -4,7 +4,7 @@ use futures_io::{AsyncRead, AsyncWrite};
use std::io;
use std::pin::Pin;
use super::{BufReader, copy_buf, CopyBuf};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
/// Creates a future which copies all the bytes from one object to another.
///
@@ -41,13 +41,14 @@ where
}
}
-/// Future for the [`copy()`] function.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct Copy<'a, R, W: ?Sized> {
- #[pin]
- inner: CopyBuf<'a, BufReader<R>, W>,
+pin_project! {
+ /// Future for the [`copy()`] function.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Copy<'a, R, W: ?Sized> {
+ #[pin]
+ inner: CopyBuf<'a, BufReader<R>, W>,
+ }
}
impl<R: AsyncRead, W: AsyncWrite + Unpin + ?Sized> Future for Copy<'_, R, W> {
diff --git a/src/io/copy_buf.rs b/src/io/copy_buf.rs
index f47144a..6adf594 100644
--- a/src/io/copy_buf.rs
+++ b/src/io/copy_buf.rs
@@ -1,9 +1,10 @@
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncWrite};
use std::io;
use std::pin::Pin;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
/// Creates a future which copies all the bytes from one object to another.
///
@@ -42,15 +43,16 @@ where
}
}
-/// Future for the [`copy_buf()`] function.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct CopyBuf<'a, R, W: ?Sized> {
- #[pin]
- reader: R,
- writer: &'a mut W,
- amt: u64,
+pin_project! {
+ /// Future for the [`copy_buf()`] function.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct CopyBuf<'a, R, W: ?Sized> {
+ #[pin]
+ reader: R,
+ writer: &'a mut W,
+ amt: u64,
+ }
}
impl<R, W> Future for CopyBuf<'_, R, W>
diff --git a/src/io/cursor.rs b/src/io/cursor.rs
index d135923..b11dbf5 100644
--- a/src/io/cursor.rs
+++ b/src/io/cursor.rs
@@ -42,8 +42,8 @@ impl<T> Cursor<T> {
/// # fn force_inference(_: &Cursor<Vec<u8>>) {}
/// # force_inference(&buff);
/// ```
- pub fn new(inner: T) -> Cursor<T> {
- Cursor {
+ pub fn new(inner: T) -> Self {
+ Self {
inner: io::Cursor::new(inner),
}
}
diff --git a/src/io/fill_buf.rs b/src/io/fill_buf.rs
index 015547e..6fb3ec7 100644
--- a/src/io/fill_buf.rs
+++ b/src/io/fill_buf.rs
@@ -15,7 +15,7 @@ impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> {
pub(super) fn new(reader: &'a mut R) -> Self {
- FillBuf { reader: Some(reader) }
+ Self { reader: Some(reader) }
}
}
diff --git a/src/io/flush.rs b/src/io/flush.rs
index 70b867a..ece0a7c 100644
--- a/src/io/flush.rs
+++ b/src/io/flush.rs
@@ -15,7 +15,7 @@ impl<W: ?Sized + Unpin> Unpin for Flush<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> {
pub(super) fn new(writer: &'a mut W) -> Self {
- Flush { writer }
+ Self { writer }
}
}
diff --git a/src/io/into_sink.rs b/src/io/into_sink.rs
index 082c581..885ba2f 100644
--- a/src/io/into_sink.rs
+++ b/src/io/into_sink.rs
@@ -1,9 +1,10 @@
+use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_io::AsyncWrite;
use futures_sink::Sink;
use std::io;
use std::pin::Pin;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
#[derive(Debug)]
struct Block<Item> {
@@ -11,22 +12,23 @@ struct Block<Item> {
bytes: Item,
}
-/// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
-#[pin_project]
-#[must_use = "sinks do nothing unless polled"]
-#[derive(Debug)]
-#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
-pub struct IntoSink<W, Item> {
- #[pin]
- writer: W,
- /// An outstanding block for us to push into the underlying writer, along with an offset of how
- /// far into this block we have written already.
- buffer: Option<Block<Item>>,
+pin_project! {
+ /// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
+ #[must_use = "sinks do nothing unless polled"]
+ #[derive(Debug)]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
+ pub struct IntoSink<W, Item> {
+ #[pin]
+ writer: W,
+ // An outstanding block for us to push into the underlying writer, along with an offset of how
+ // far into this block we have written already.
+ buffer: Option<Block<Item>>,
+ }
}
impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> {
pub(super) fn new(writer: W) -> Self {
- IntoSink { writer, buffer: None }
+ Self { writer, buffer: None }
}
/// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_
diff --git a/src/io/lines.rs b/src/io/lines.rs
index 90b993d..6ae7392 100644
--- a/src/io/lines.rs
+++ b/src/io/lines.rs
@@ -1,3 +1,4 @@
+use futures_core::ready;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_io::AsyncBufRead;
@@ -5,19 +6,19 @@ use std::io;
use std::mem;
use std::pin::Pin;
use super::read_line::read_line_internal;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`lines`](super::AsyncBufReadExt::lines) method.
-
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Lines<R> {
- #[pin]
- reader: R,
- buf: String,
- bytes: Vec<u8>,
- read: usize,
+pin_project! {
+ /// Stream for the [`lines`](super::AsyncBufReadExt::lines) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Lines<R> {
+ #[pin]
+ reader: R,
+ buf: String,
+ bytes: Vec<u8>,
+ read: usize,
+ }
}
impl<R: AsyncBufRead> Lines<R> {
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 51ee995..a7e2add 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -1,12 +1,19 @@
-//! IO
+//! Asynchronous I/O.
//!
-//! This module contains a number of functions for working with
-//! `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and `AsyncBufRead` types, including
-//! the `AsyncReadExt`, `AsyncWriteExt`, `AsyncSeekExt`, and `AsyncBufReadExt`
-//! traits which add methods to the `AsyncRead`, `AsyncWrite`, `AsyncSeek`,
-//! and `AsyncBufRead` types.
+//! This module is the asynchronous version of `std::io`. It defines four
+//! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
+//! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
+//! standard library. However, these traits integrate with the asynchronous
+//! task system, so that if an I/O object isn't ready for reading (or writing),
+//! the thread is not blocked, and instead the current task is queued to be
+//! woken when I/O is ready.
//!
-//! This module is only available when the `io` and `std` features of this
+//! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
+//! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
+//! for operating with asynchronous I/O objects, including ways to work with
+//! them using futures, streams and sinks.
+//!
+//! This module is only available when the `std` feature of this
//! library is activated, and it is activated by default.
#[cfg(feature = "io-compat")]
diff --git a/src/io/read.rs b/src/io/read.rs
index ea25959..677ba81 100644
--- a/src/io/read.rs
+++ b/src/io/read.rs
@@ -16,7 +16,7 @@ impl<R: ?Sized + Unpin> Unpin for Read<'_, R> {}
impl<'a, R: AsyncRead + ?Sized + Unpin> Read<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self {
- Read { reader, buf }
+ Self { reader, buf }
}
}
diff --git a/src/io/read_exact.rs b/src/io/read_exact.rs
index a2bbd40..f2e0440 100644
--- a/src/io/read_exact.rs
+++ b/src/io/read_exact.rs
@@ -1,4 +1,5 @@
use crate::io::AsyncRead;
+use futures_core::ready;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use std::io;
@@ -17,7 +18,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadExact<'_, R> {}
impl<'a, R: AsyncRead + ?Sized + Unpin> ReadExact<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self {
- ReadExact { reader, buf }
+ Self { reader, buf }
}
}
diff --git a/src/io/read_line.rs b/src/io/read_line.rs
index 81d8415..d402c96 100644
--- a/src/io/read_line.rs
+++ b/src/io/read_line.rs
@@ -1,3 +1,4 @@
+use futures_core::ready;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::AsyncBufRead;
@@ -38,7 +39,7 @@ pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
read: &mut usize,
) -> Poll<io::Result<usize>> {
let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
- if str::from_utf8(&bytes).is_err() {
+ if str::from_utf8(bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8"))
}))
diff --git a/src/io/read_to_end.rs b/src/io/read_to_end.rs
index 70b0578..7bd2c89 100644
--- a/src/io/read_to_end.rs
+++ b/src/io/read_to_end.rs
@@ -1,4 +1,5 @@
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_io::AsyncRead;
use std::io;
@@ -27,11 +28,16 @@ impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToEnd<'a, R> {
}
}
-struct Guard<'a> { buf: &'a mut Vec<u8>, len: usize }
+struct Guard<'a> {
+ buf: &'a mut Vec<u8>,
+ len: usize,
+}
impl Drop for Guard<'_> {
fn drop(&mut self) {
- unsafe { self.buf.set_len(self.len); }
+ unsafe {
+ self.buf.set_len(self.len);
+ }
}
}
@@ -50,8 +56,10 @@ pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
buf: &mut Vec<u8>,
start_len: usize,
) -> Poll<io::Result<usize>> {
- let mut g = Guard { len: buf.len(), buf };
- let ret;
+ let mut g = Guard {
+ len: buf.len(),
+ buf,
+ };
loop {
if g.len == g.buf.len() {
unsafe {
@@ -62,24 +70,24 @@ pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
}
}
- match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
- Ok(0) => {
- ret = Poll::Ready(Ok(g.len - start_len));
- break;
- }
- Ok(n) => g.len += n,
- Err(e) => {
- ret = Poll::Ready(Err(e));
- break;
+ let buf = &mut g.buf[g.len..];
+ match ready!(rd.as_mut().poll_read(cx, buf)) {
+ Ok(0) => return Poll::Ready(Ok(g.len - start_len)),
+ Ok(n) => {
+ // We can't allow bogus values from read. If it is too large, the returned vec could have its length
+ // set past its capacity, or if it overflows the vec could be shortened which could create an invalid
+ // string if this is called via read_to_string.
+ assert!(n <= buf.len());
+ g.len += n;
}
+ Err(e) => return Poll::Ready(Err(e)),
}
}
-
- ret
}
impl<A> Future for ReadToEnd<'_, A>
- where A: AsyncRead + ?Sized + Unpin,
+where
+ A: AsyncRead + ?Sized + Unpin,
{
type Output = io::Result<usize>;
diff --git a/src/io/read_to_string.rs b/src/io/read_to_string.rs
index 113fe6a..9242654 100644
--- a/src/io/read_to_string.rs
+++ b/src/io/read_to_string.rs
@@ -1,4 +1,5 @@
use super::read_to_end::read_to_end_internal;
+use futures_core::ready;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::AsyncRead;
@@ -38,7 +39,7 @@ fn read_to_string_internal<R: AsyncRead + ?Sized>(
start_len: usize,
) -> Poll<io::Result<usize>> {
let ret = ready!(read_to_end_internal(reader, cx, bytes, start_len));
- if str::from_utf8(&bytes).is_err() {
+ if str::from_utf8(bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
diff --git a/src/io/read_until.rs b/src/io/read_until.rs
index 95c47e0..72b59ea 100644
--- a/src/io/read_until.rs
+++ b/src/io/read_until.rs
@@ -1,4 +1,5 @@
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_io::AsyncBufRead;
use std::io;
diff --git a/src/io/repeat.rs b/src/io/repeat.rs
index 84abd7f..4cefcb2 100644
--- a/src/io/repeat.rs
+++ b/src/io/repeat.rs
@@ -1,3 +1,4 @@
+use futures_core::ready;
use futures_core::task::{Context, Poll};
#[cfg(feature = "read-initializer")]
use futures_io::Initializer;
diff --git a/src/io/split.rs b/src/io/split.rs
index ccddd04..185c21c 100644
--- a/src/io/split.rs
+++ b/src/io/split.rs
@@ -1,4 +1,5 @@
use crate::lock::BiLock;
+use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
use core::fmt;
diff --git a/src/io/take.rs b/src/io/take.rs
index 6179486..687a697 100644
--- a/src/io/take.rs
+++ b/src/io/take.rs
@@ -1,20 +1,22 @@
+use futures_core::ready;
use futures_core::task::{Context, Poll};
#[cfg(feature = "read-initializer")]
use futures_io::Initializer;
use futures_io::{AsyncRead, AsyncBufRead};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use std::{cmp, io};
use std::pin::Pin;
-/// Reader for the [`take`](super::AsyncReadExt::take) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "readers do nothing unless you `.await` or poll them"]
-pub struct Take<R> {
- #[pin]
- inner: R,
- // Add '_' to avoid conflicts with `limit` method.
- limit_: u64,
+pin_project! {
+ /// Reader for the [`take`](super::AsyncReadExt::take) method.
+ #[derive(Debug)]
+ #[must_use = "readers do nothing unless you `.await` or poll them"]
+ pub struct Take<R> {
+ #[pin]
+ inner: R,
+ // Add '_' to avoid conflicts with `limit` method.
+ limit_: u64,
+ }
}
impl<R: AsyncRead> Take<R> {
diff --git a/src/io/write_all.rs b/src/io/write_all.rs
index f9ffb49..b134bf1 100644
--- a/src/io/write_all.rs
+++ b/src/io/write_all.rs
@@ -1,4 +1,5 @@
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_io::AsyncWrite;
use std::io;
@@ -17,7 +18,7 @@ impl<W: ?Sized + Unpin> Unpin for WriteAll<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAll<'a, W> {
pub(super) fn new(writer: &'a mut W, buf: &'a [u8]) -> Self {
- WriteAll { writer, buf }
+ Self { writer, buf }
}
}
diff --git a/src/io/write_all_vectored.rs b/src/io/write_all_vectored.rs
index ec28798..380604d 100644
--- a/src/io/write_all_vectored.rs
+++ b/src/io/write_all_vectored.rs
@@ -1,3 +1,4 @@
+use futures_core::ready;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::AsyncWrite;
@@ -19,7 +20,7 @@ impl<W: ?Sized + Unpin> Unpin for WriteAllVectored<'_, W> {}
impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> {
pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self {
- WriteAllVectored { writer, bufs: IoSlice::advance(bufs, 0) }
+ Self { writer, bufs: IoSlice::advance(bufs, 0) }
}
}
@@ -186,7 +187,7 @@ mod tests {
(vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2]), IoSlice::new(&[3, 3, 3])], &[1, 1, 1, 2, 2, 2, 3, 3, 3]),
];
- for (mut input, wanted) in tests.into_iter() {
+ for (mut input, wanted) in tests {
let mut dst = test_writer(2, 2);
{
let mut future = dst.write_all_vectored(&mut *input);
diff --git a/src/lib.rs b/src/lib.rs
index 1cf165c..44823cc 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -4,22 +4,17 @@
#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
#![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]
-
#![cfg_attr(not(feature = "std"), no_std)]
-#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
+#![warn(
+ missing_docs,
+ missing_debug_implementations,
+ rust_2018_idioms,
+ unreachable_pub
+)]
// It cannot be included in the published code because this lints have false positives in the minimum required version.
#![cfg_attr(test, warn(single_use_lifetimes))]
#![warn(clippy::all)]
-
-// mem::take requires Rust 1.40, matches! requires Rust 1.42
-// Can be removed if the minimum supported version increased or if https://github.com/rust-lang/rust-clippy/issues/3941
-// get's implemented.
-#![allow(clippy::mem_replace_with_default, clippy::match_like_matches_macro)]
-
#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
-
-#![doc(html_root_url = "https://docs.rs/futures-util/0.3.7")]
-
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
@@ -34,9 +29,6 @@ compile_error!("The `read-initializer` feature requires the `unstable` feature a
#[cfg(feature = "alloc")]
extern crate alloc;
-#[macro_use(ready)]
-extern crate futures_core;
-
// Macro re-exports
pub use futures_core::ready;
pub use pin_utils::pin_mut;
@@ -54,7 +46,7 @@ pub use self::async_await::*;
pub mod __private {
pub use crate::*;
pub use core::{
- option::Option::{self, Some, None},
+ option::Option::{self, None, Some},
pin::Pin,
result::Result::{Err, Ok},
};
@@ -81,10 +73,7 @@ macro_rules! delegate_sink {
self.project().$field.poll_ready(cx)
}
- fn start_send(
- self: core::pin::Pin<&mut Self>,
- item: $item,
- ) -> Result<(), Self::Error> {
+ fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> {
self.project().$field.start_send(item)
}
@@ -101,7 +90,7 @@ macro_rules! delegate_sink {
) -> core::task::Poll<Result<(), Self::Error>> {
self.project().$field.poll_close(cx)
}
- }
+ };
}
macro_rules! delegate_future {
@@ -112,7 +101,7 @@ macro_rules! delegate_future {
) -> core::task::Poll<Self::Output> {
self.project().$field.poll(cx)
}
- }
+ };
}
macro_rules! delegate_stream {
@@ -126,34 +115,40 @@ macro_rules! delegate_stream {
fn size_hint(&self) -> (usize, Option<usize>) {
self.$field.size_hint()
}
- }
+ };
}
#[cfg(feature = "io")]
#[cfg(feature = "std")]
macro_rules! delegate_async_write {
($field:ident) => {
- fn poll_write(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, buf: &[u8])
- -> core::task::Poll<std::io::Result<usize>>
- {
+ fn poll_write(
+ self: core::pin::Pin<&mut Self>,
+ cx: &mut core::task::Context<'_>,
+ buf: &[u8],
+ ) -> core::task::Poll<std::io::Result<usize>> {
self.project().$field.poll_write(cx, buf)
}
- fn poll_write_vectored(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, bufs: &[std::io::IoSlice<'_>])
- -> core::task::Poll<std::io::Result<usize>>
- {
+ fn poll_write_vectored(
+ self: core::pin::Pin<&mut Self>,
+ cx: &mut core::task::Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> core::task::Poll<std::io::Result<usize>> {
self.project().$field.poll_write_vectored(cx, bufs)
}
- fn poll_flush(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>)
- -> core::task::Poll<std::io::Result<()>>
- {
+ fn poll_flush(
+ self: core::pin::Pin<&mut Self>,
+ cx: &mut core::task::Context<'_>,
+ ) -> core::task::Poll<std::io::Result<()>> {
self.project().$field.poll_flush(cx)
}
- fn poll_close(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>)
- -> core::task::Poll<std::io::Result<()>>
- {
+ fn poll_close(
+ self: core::pin::Pin<&mut Self>,
+ cx: &mut core::task::Context<'_>,
+ ) -> core::task::Poll<std::io::Result<()>> {
self.project().$field.poll_close(cx)
}
- }
+ };
}
#[cfg(feature = "io")]
@@ -165,18 +160,22 @@ macro_rules! delegate_async_read {
self.$field.initializer()
}
- fn poll_read(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, buf: &mut [u8])
- -> core::task::Poll<std::io::Result<usize>>
- {
+ fn poll_read(
+ self: core::pin::Pin<&mut Self>,
+ cx: &mut core::task::Context<'_>,
+ buf: &mut [u8],
+ ) -> core::task::Poll<std::io::Result<usize>> {
self.project().$field.poll_read(cx, buf)
}
- fn poll_read_vectored(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, bufs: &mut [std::io::IoSliceMut<'_>])
- -> core::task::Poll<std::io::Result<usize>>
- {
+ fn poll_read_vectored(
+ self: core::pin::Pin<&mut Self>,
+ cx: &mut core::task::Context<'_>,
+ bufs: &mut [std::io::IoSliceMut<'_>],
+ ) -> core::task::Poll<std::io::Result<usize>> {
self.project().$field.poll_read_vectored(cx, bufs)
}
- }
+ };
}
#[cfg(feature = "io")]
@@ -193,7 +192,7 @@ macro_rules! delegate_async_buf_read {
fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
self.project().$field.consume(amt)
}
- }
+ };
}
macro_rules! delegate_access_inner {
@@ -289,10 +288,11 @@ macro_rules! delegate_all {
}
};
($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
- #[pin_project::pin_project]
- #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
- $(#[$attr])*
- pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner:$t }
+ pin_project_lite::pin_project! {
+ #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
+ $(#[$attr])*
+ pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t }
+ }
impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* {
$($($item)*)*
@@ -308,16 +308,19 @@ macro_rules! delegate_all {
}
pub mod future;
-#[doc(hidden)] pub use crate::future::{FutureExt, TryFutureExt};
+#[doc(hidden)]
+pub use crate::future::{FutureExt, TryFutureExt};
pub mod stream;
-#[doc(hidden)] pub use crate::stream::{StreamExt, TryStreamExt};
+#[doc(hidden)]
+pub use crate::stream::{StreamExt, TryStreamExt};
#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub mod sink;
#[cfg(feature = "sink")]
-#[doc(hidden)] pub use crate::sink::SinkExt;
+#[doc(hidden)]
+pub use crate::sink::SinkExt;
pub mod task;
@@ -333,12 +336,11 @@ pub mod compat;
pub mod io;
#[cfg(feature = "io")]
#[cfg(feature = "std")]
-#[doc(hidden)] pub use crate::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt};
-
-mod fns;
+#[doc(hidden)]
+pub use crate::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
+#[cfg(feature = "alloc")]
+pub mod lock;
-cfg_target_has_atomic! {
- #[cfg(feature = "alloc")]
- pub mod lock;
-}
+mod fns;
+mod unfold_state;
diff --git a/src/lock/bilock.rs b/src/lock/bilock.rs
index 3698406..600e16e 100644
--- a/src/lock/bilock.rs
+++ b/src/lock/bilock.rs
@@ -60,13 +60,13 @@ impl<T> BiLock<T> {
/// will only be available through `Pin<&mut T>` (not `&mut T`) unless `T` is `Unpin`.
/// Similarly, reuniting the lock and extracting the inner value is only
/// possible when `T` is `Unpin`.
- pub fn new(t: T) -> (BiLock<T>, BiLock<T>) {
+ pub fn new(t: T) -> (Self, Self) {
let arc = Arc::new(Inner {
state: AtomicUsize::new(0),
value: Some(UnsafeCell::new(t)),
});
- (BiLock { arc: arc.clone() }, BiLock { arc })
+ (Self { arc: arc.clone() }, Self { arc })
}
/// Attempt to acquire this lock, returning `Pending` if it can't be
diff --git a/src/lock/mod.rs b/src/lock/mod.rs
index b252613..071eef6 100644
--- a/src/lock/mod.rs
+++ b/src/lock/mod.rs
@@ -3,18 +3,20 @@
//! This module is only available when the `std` or `alloc` feature of this
//! library is activated, and it is activated by default.
-#[cfg(feature = "std")]
-mod mutex;
-#[cfg(feature = "std")]
-pub use self::mutex::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard};
+cfg_target_has_atomic! {
+ #[cfg(feature = "std")]
+ mod mutex;
+ #[cfg(feature = "std")]
+ pub use self::mutex::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard};
-#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
-#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
-#[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]
-mod bilock;
-#[cfg(feature = "bilock")]
-#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
-pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
-#[cfg(any(feature = "sink", feature = "io"))]
-#[cfg(not(feature = "bilock"))]
-pub(crate) use self::bilock::BiLock;
+ #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
+ #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]
+ mod bilock;
+ #[cfg(feature = "bilock")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
+ pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
+ #[cfg(any(feature = "sink", feature = "io"))]
+ #[cfg(not(feature = "bilock"))]
+ pub(crate) use self::bilock::BiLock;
+}
diff --git a/src/lock/mutex.rs b/src/lock/mutex.rs
index 84aeeda..a78de62 100644
--- a/src/lock/mutex.rs
+++ b/src/lock/mutex.rs
@@ -40,8 +40,8 @@ impl<T> From<T> for Mutex<T> {
}
impl<T: Default> Default for Mutex<T> {
- fn default() -> Mutex<T> {
- Mutex::new(Default::default())
+ fn default() -> Self {
+ Self::new(Default::default())
}
}
@@ -53,15 +53,15 @@ enum Waiter {
impl Waiter {
fn register(&mut self, waker: &Waker) {
match self {
- Waiter::Waiting(w) if waker.will_wake(w) => {},
- _ => *self = Waiter::Waiting(waker.clone()),
+ Self::Waiting(w) if waker.will_wake(w) => {},
+ _ => *self = Self::Waiting(waker.clone()),
}
}
fn wake(&mut self) {
- match mem::replace(self, Waiter::Woken) {
- Waiter::Waiting(waker) => waker.wake(),
- Waiter::Woken => {},
+ match mem::replace(self, Self::Woken) {
+ Self::Waiting(waker) => waker.wake(),
+ Self::Woken => {},
}
}
}
@@ -72,8 +72,8 @@ const HAS_WAITERS: usize = 1 << 1;
impl<T> Mutex<T> {
/// Creates a new futures-aware mutex.
- pub fn new(t: T) -> Mutex<T> {
- Mutex {
+ pub fn new(t: T) -> Self {
+ Self {
state: AtomicUsize::new(0),
waiters: StdMutex::new(Slab::new()),
value: UnsafeCell::new(t),
diff --git a/src/never.rs b/src/never.rs
index 767c5af..e811f97 100644
--- a/src/never.rs
+++ b/src/never.rs
@@ -1,5 +1,6 @@
-//! Definition of the `Never` type,
-//! a stand-in for the `!` type until it becomes stable.
+//! This module contains the `Never` type.
+//!
+//! Values of this type can never be created and will never exist.
/// A type with no possible values.
///
diff --git a/src/sink/buffer.rs b/src/sink/buffer.rs
index 8176abd..8c58f4f 100644
--- a/src/sink/buffer.rs
+++ b/src/sink/buffer.rs
@@ -1,26 +1,28 @@
+use futures_core::ready;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use core::pin::Pin;
use alloc::collections::VecDeque;
-/// Sink for the [`buffer`](super::SinkExt::buffer) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "sinks do nothing unless polled"]
-pub struct Buffer<Si, Item> {
- #[pin]
- sink: Si,
- buf: VecDeque<Item>,
-
- // Track capacity separately from the `VecDeque`, which may be rounded up
- capacity: usize,
+pin_project! {
+ /// Sink for the [`buffer`](super::SinkExt::buffer) method.
+ #[derive(Debug)]
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct Buffer<Si, Item> {
+ #[pin]
+ sink: Si,
+ buf: VecDeque<Item>,
+
+ // Track capacity separately from the `VecDeque`, which may be rounded up
+ capacity: usize,
+ }
}
impl<Si: Sink<Item>, Item> Buffer<Si, Item> {
pub(super) fn new(sink: Si, capacity: usize) -> Self {
- Buffer {
+ Self {
sink,
buf: VecDeque::with_capacity(capacity),
capacity,
diff --git a/src/sink/close.rs b/src/sink/close.rs
index 1514b41..4421d10 100644
--- a/src/sink/close.rs
+++ b/src/sink/close.rs
@@ -19,7 +19,7 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Close<'_, Si, Item> {}
/// The sink itself is returned after closeing is complete.
impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Close<'a, Si, Item> {
pub(super) fn new(sink: &'a mut Si) -> Self {
- Close {
+ Self {
sink,
_phantom: PhantomData,
}
diff --git a/src/sink/err_into.rs b/src/sink/err_into.rs
index b23ada7..3eb9940 100644
--- a/src/sink/err_into.rs
+++ b/src/sink/err_into.rs
@@ -1,15 +1,16 @@
use crate::sink::{SinkExt, SinkMapErr};
use futures_core::stream::{Stream, FusedStream};
use futures_sink::{Sink};
-use pin_project::pin_project;
-
-/// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "sinks do nothing unless polled"]
-pub struct SinkErrInto<Si: Sink<Item>, Item, E> {
- #[pin]
- sink: SinkMapErr<Si, fn(Si::Error) -> E>,
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method.
+ #[derive(Debug)]
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct SinkErrInto<Si: Sink<Item>, Item, E> {
+ #[pin]
+ sink: SinkMapErr<Si, fn(Si::Error) -> E>,
+ }
}
impl<Si, E, Item> SinkErrInto<Si, Item, E>
@@ -17,7 +18,7 @@ impl<Si, E, Item> SinkErrInto<Si, Item, E>
Si::Error: Into<E>,
{
pub(super) fn new(sink: Si) -> Self {
- SinkErrInto {
+ Self {
sink: SinkExt::sink_map_err(sink, Into::into),
}
}
diff --git a/src/sink/fanout.rs b/src/sink/fanout.rs
index d71d793..f351e86 100644
--- a/src/sink/fanout.rs
+++ b/src/sink/fanout.rs
@@ -2,24 +2,25 @@ use core::fmt::{Debug, Formatter, Result as FmtResult};
use core::pin::Pin;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
-use pin_project::pin_project;
-
-/// Sink that clones incoming items and forwards them to two sinks at the same time.
-///
-/// Backpressure from any downstream sink propagates up, which means that this sink
-/// can only process items as fast as its _slowest_ downstream sink.
-#[pin_project]
-#[must_use = "sinks do nothing unless polled"]
-pub struct Fanout<Si1, Si2> {
- #[pin]
- sink1: Si1,
- #[pin]
- sink2: Si2
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Sink that clones incoming items and forwards them to two sinks at the same time.
+ ///
+ /// Backpressure from any downstream sink propagates up, which means that this sink
+ /// can only process items as fast as its _slowest_ downstream sink.
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct Fanout<Si1, Si2> {
+ #[pin]
+ sink1: Si1,
+ #[pin]
+ sink2: Si2
+ }
}
impl<Si1, Si2> Fanout<Si1, Si2> {
- pub(super) fn new(sink1: Si1, sink2: Si2) -> Fanout<Si1, Si2> {
- Fanout { sink1, sink2 }
+ pub(super) fn new(sink1: Si1, sink2: Si2) -> Self {
+ Self { sink1, sink2 }
}
/// Get a shared reference to the inner sinks.
diff --git a/src/sink/feed.rs b/src/sink/feed.rs
new file mode 100644
index 0000000..06df9a9
--- /dev/null
+++ b/src/sink/feed.rs
@@ -0,0 +1,49 @@
+use core::pin::Pin;
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+
+/// Future for the [`feed`](super::SinkExt::feed) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Feed<'a, Si: ?Sized, Item> {
+ sink: &'a mut Si,
+ item: Option<Item>,
+}
+
+// Pinning is never projected to children
+impl<Si: Unpin + ?Sized, Item> Unpin for Feed<'_, Si, Item> {}
+
+impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Feed<'a, Si, Item> {
+ pub(super) fn new(sink: &'a mut Si, item: Item) -> Self {
+ Feed {
+ sink,
+ item: Some(item),
+ }
+ }
+
+ pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> {
+ Pin::new(self.sink)
+ }
+
+ pub(super) fn is_item_pending(&self) -> bool {
+ self.item.is_some()
+ }
+}
+
+impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> {
+ type Output = Result<(), Si::Error>;
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ let this = self.get_mut();
+ let mut sink = Pin::new(&mut this.sink);
+ ready!(sink.as_mut().poll_ready(cx))?;
+ let item = this.item.take().expect("polled Feed after completion");
+ sink.as_mut().start_send(item)?;
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/src/sink/flush.rs b/src/sink/flush.rs
index 86fecc4..c06a221 100644
--- a/src/sink/flush.rs
+++ b/src/sink/flush.rs
@@ -23,7 +23,7 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Flush<'_, Si, Item> {}
/// all current requests are processed.
impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Flush<'a, Si, Item> {
pub(super) fn new(sink: &'a mut Si) -> Self {
- Flush {
+ Self {
sink,
_phantom: PhantomData,
}
diff --git a/src/sink/map_err.rs b/src/sink/map_err.rs
index 29994a7..2829344 100644
--- a/src/sink/map_err.rs
+++ b/src/sink/map_err.rs
@@ -2,21 +2,22 @@ use core::pin::Pin;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use futures_sink::{Sink};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method.
-#[pin_project]
-#[derive(Debug, Clone)]
-#[must_use = "sinks do nothing unless polled"]
-pub struct SinkMapErr<Si, F> {
- #[pin]
- sink: Si,
- f: Option<F>,
+pin_project! {
+ /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method.
+ #[derive(Debug, Clone)]
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct SinkMapErr<Si, F> {
+ #[pin]
+ sink: Si,
+ f: Option<F>,
+ }
}
impl<Si, F> SinkMapErr<Si, F> {
- pub(super) fn new(sink: Si, f: F) -> SinkMapErr<Si, F> {
- SinkMapErr { sink, f: Some(f) }
+ pub(super) fn new(sink: Si, f: F) -> Self {
+ Self { sink, f: Some(f) }
}
delegate_access_inner!(sink, Si, ());
diff --git a/src/sink/mod.rs b/src/sink/mod.rs
index b0e2c83..1a062d0 100644
--- a/src/sink/mod.rs
+++ b/src/sink/mod.rs
@@ -1,16 +1,16 @@
-//! Sinks
+//! Asynchronous sinks.
//!
-//! This module contains a number of functions for working with `Sink`s,
-//! including the `SinkExt` trait which adds methods to `Sink` types.
+//! This module contains:
//!
-//! This module is only available when the `sink` feature of this
-//! library is activated, and it is activated by default.
+//! - The [`Sink`] trait, which allows you to asynchronously write data.
+//! - The [`SinkExt`] trait, which provides adapters for chaining and composing
+//! sinks.
+use crate::future::Either;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{Stream, TryStream};
use futures_core::task::{Context, Poll};
-use crate::future::Either;
#[cfg(feature = "compat")]
use crate::compat::CompatSink;
@@ -26,6 +26,9 @@ pub use self::drain::{drain, Drain};
mod fanout;
pub use self::fanout::Fanout;
+mod feed;
+pub use self::feed::Feed;
+
mod flush;
pub use self::flush::Flush;
@@ -41,6 +44,9 @@ pub use self::send::Send;
mod send_all;
pub use self::send_all::SendAll;
+mod unfold;
+pub use self::unfold::{unfold, Unfold};
+
mod with;
pub use self::with::With;
@@ -69,10 +75,11 @@ pub trait SinkExt<Item>: Sink<Item> {
/// Note that this function consumes the given sink, returning a wrapped
/// version, much like `Iterator::map`.
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
- where F: FnMut(U) -> Fut,
- Fut: Future<Output = Result<Item, E>>,
- E: From<Self::Error>,
- Self: Sized
+ where
+ F: FnMut(U) -> Fut,
+ Fut: Future<Output = Result<Item, E>>,
+ E: From<Self::Error>,
+ Self: Sized,
{
With::new(self, f)
}
@@ -110,9 +117,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// # });
/// ```
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
- where F: FnMut(U) -> St,
- St: Stream<Item = Result<Item, Self::Error>>,
- Self: Sized
+ where
+ F: FnMut(U) -> St,
+ St: Stream<Item = Result<Item, Self::Error>>,
+ Self: Sized,
{
WithFlatMap::new(self, f)
}
@@ -133,8 +141,9 @@ pub trait SinkExt<Item>: Sink<Item> {
/// Transforms the error returned by the sink.
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
- where F: FnOnce(Self::Error) -> E,
- Self: Sized,
+ where
+ F: FnOnce(Self::Error) -> E,
+ Self: Sized,
{
SinkMapErr::new(self, f)
}
@@ -143,13 +152,13 @@ pub trait SinkExt<Item>: Sink<Item> {
///
/// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
- where Self: Sized,
- Self::Error: Into<E>,
+ where
+ Self: Sized,
+ Self::Error: Into<E>,
{
SinkErrInto::new(self)
}
-
/// Adds a fixed-size buffer to the current sink.
///
/// The resulting sink will buffer up to `capacity` items when the
@@ -164,14 +173,16 @@ pub trait SinkExt<Item>: Sink<Item> {
/// library is activated, and it is activated by default.
#[cfg(feature = "alloc")]
fn buffer(self, capacity: usize) -> Buffer<Self, Item>
- where Self: Sized,
+ where
+ Self: Sized,
{
Buffer::new(self, capacity)
}
/// Close the sink.
fn close(&mut self) -> Close<'_, Self, Item>
- where Self: Unpin,
+ where
+ Self: Unpin,
{
Close::new(self)
}
@@ -181,9 +192,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This adapter clones each incoming item and forwards it to both this as well as
/// the other sink at the same time.
fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
- where Self: Sized,
- Item: Clone,
- Si: Sink<Item, Error=Self::Error>
+ where
+ Self: Sized,
+ Item: Clone,
+ Si: Sink<Item, Error = Self::Error>,
{
Fanout::new(self, other)
}
@@ -193,7 +205,8 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This adapter is intended to be used when you want to stop sending to the sink
/// until all current requests are processed.
fn flush(&mut self) -> Flush<'_, Self, Item>
- where Self: Unpin,
+ where
+ Self: Unpin,
{
Flush::new(self)
}
@@ -202,14 +215,27 @@ pub trait SinkExt<Item>: Sink<Item> {
/// into the sink, including flushing.
///
/// Note that, **because of the flushing requirement, it is usually better
- /// to batch together items to send via `send_all`, rather than flushing
- /// between each item.**
+ /// to batch together items to send via `feed` or `send_all`,
+ /// rather than flushing between each item.**
fn send(&mut self, item: Item) -> Send<'_, Self, Item>
- where Self: Unpin,
+ where
+ Self: Unpin,
{
Send::new(self, item)
}
+ /// A future that completes after the given item has been received
+ /// by the sink.
+ ///
+ /// Unlike `send`, the returned future does not flush the sink.
+ /// It is the caller's responsibility to ensure all pending items
+ /// are processed, which can be done via `flush` or `close`.
+ fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
+ where Self: Unpin,
+ {
+ Feed::new(self, item)
+ }
+
/// A future that completes after the given stream has been fully processed
/// into the sink, including flushing.
///
@@ -221,12 +247,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// Doing `sink.send_all(stream)` is roughly equivalent to
/// `stream.forward(sink)`. The returned future will exhaust all items from
/// `stream` and send them to `self`.
- fn send_all<'a, St>(
- &'a mut self,
- stream: &'a mut St
- ) -> SendAll<'a, Self, St>
- where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
- Self: Unpin,
+ fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
+ where
+ St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
+ Self: Unpin,
{
SendAll::new(self, stream)
}
@@ -237,8 +261,9 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This can be used in combination with the `right_sink` method to write `if`
/// statements that evaluate to different streams in different branches.
fn left_sink<Si2>(self) -> Either<Self, Si2>
- where Si2: Sink<Item, Error = Self::Error>,
- Self: Sized
+ where
+ Si2: Sink<Item, Error = Self::Error>,
+ Self: Sized,
{
Either::Left(self)
}
@@ -249,8 +274,9 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This can be used in combination with the `left_sink` method to write `if`
/// statements that evaluate to different streams in different branches.
fn right_sink<Si1>(self) -> Either<Si1, Self>
- where Si1: Sink<Item, Error = Self::Error>,
- Self: Sized
+ where
+ Si1: Sink<Item, Error = Self::Error>,
+ Self: Sized,
{
Either::Right(self)
}
@@ -260,15 +286,17 @@ pub trait SinkExt<Item>: Sink<Item> {
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
fn compat(self) -> CompatSink<Self, Item>
- where Self: Sized + Unpin,
+ where
+ Self: Sized + Unpin,
{
CompatSink::new(self)
}
-
+
/// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
/// sink types.
fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
- where Self: Unpin
+ where
+ Self: Unpin,
{
Pin::new(self).poll_ready(cx)
}
@@ -276,7 +304,8 @@ pub trait SinkExt<Item>: Sink<Item> {
/// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
/// sink types.
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
- where Self: Unpin
+ where
+ Self: Unpin,
{
Pin::new(self).start_send(item)
}
@@ -284,7 +313,8 @@ pub trait SinkExt<Item>: Sink<Item> {
/// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
/// sink types.
fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
- where Self: Unpin
+ where
+ Self: Unpin,
{
Pin::new(self).poll_flush(cx)
}
@@ -292,7 +322,8 @@ pub trait SinkExt<Item>: Sink<Item> {
/// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
/// sink types.
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
- where Self: Unpin
+ where
+ Self: Unpin,
{
Pin::new(self).poll_close(cx)
}
diff --git a/src/sink/send.rs b/src/sink/send.rs
index dc7f0be..384c22c 100644
--- a/src/sink/send.rs
+++ b/src/sink/send.rs
@@ -1,5 +1,7 @@
+use super::Feed;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
@@ -7,8 +9,7 @@ use futures_sink::Sink;
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Send<'a, Si: ?Sized, Item> {
- sink: &'a mut Si,
- item: Option<Item>,
+ feed: Feed<'a, Si, Item>,
}
// Pinning is never projected to children
@@ -16,9 +17,8 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Send<'_, Si, Item> {}
impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Send<'a, Si, Item> {
pub(super) fn new(sink: &'a mut Si, item: Item) -> Self {
- Send {
- sink,
- item: Some(item),
+ Self {
+ feed: Feed::new(sink, item),
}
}
}
@@ -31,20 +31,15 @@ impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> {
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let this = &mut *self;
- if let Some(item) = this.item.take() {
- let mut sink = Pin::new(&mut this.sink);
- match sink.as_mut().poll_ready(cx)? {
- Poll::Ready(()) => sink.as_mut().start_send(item)?,
- Poll::Pending => {
- this.item = Some(item);
- return Poll::Pending;
- }
- }
+
+ if this.feed.is_item_pending() {
+ ready!(Pin::new(&mut this.feed).poll(cx))?;
+ debug_assert!(!this.feed.is_item_pending());
}
// we're done sending the item, but want to block on flushing the
// sink
- ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
+ ready!(this.feed.sink_pin_mut().poll_flush(cx))?;
Poll::Ready(Ok(()))
}
diff --git a/src/sink/send_all.rs b/src/sink/send_all.rs
index 255df4d..6a33459 100644
--- a/src/sink/send_all.rs
+++ b/src/sink/send_all.rs
@@ -2,6 +2,7 @@ use crate::stream::{StreamExt, TryStreamExt, Fuse};
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{TryStream, Stream};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
@@ -49,8 +50,8 @@ where
pub(super) fn new(
sink: &'a mut Si,
stream: &'a mut St,
- ) -> SendAll<'a, Si, St> {
- SendAll {
+ ) -> Self {
+ Self {
sink,
stream: stream.fuse(),
buffered: None,
diff --git a/src/sink/unfold.rs b/src/sink/unfold.rs
new file mode 100644
index 0000000..1aab200
--- /dev/null
+++ b/src/sink/unfold.rs
@@ -0,0 +1,88 @@
+use crate::unfold_state::UnfoldState;
+use core::{future::Future, pin::Pin};
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Sink for the [`unfold`] function.
+ #[derive(Debug)]
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct Unfold<T, F, R> {
+ function: F,
+ #[pin]
+ state: UnfoldState<T, R>,
+ }
+}
+
+/// Create a sink from a function which processes one item at a time.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::sink::{self, SinkExt};
+///
+/// let unfold = sink::unfold(0, |mut sum, i: i32| {
+/// async move {
+/// sum += i;
+/// eprintln!("{}", i);
+/// Ok::<_, futures::never::Never>(sum)
+/// }
+/// });
+/// futures::pin_mut!(unfold);
+/// unfold.send(5).await?;
+/// # Ok::<(), futures::never::Never>(()) }).unwrap();
+/// ```
+pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R>
+where
+ F: FnMut(T, Item) -> R,
+ R: Future<Output = Result<T, E>>,
+{
+ Unfold {
+ function,
+ state: UnfoldState::Value { value: init },
+ }
+}
+
+impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
+where
+ F: FnMut(T, Item) -> R,
+ R: Future<Output = Result<T, E>>,
+{
+ type Error = E;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_flush(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ let mut this = self.project();
+ let future = match this.state.as_mut().take_value() {
+ Some(value) => (this.function)(value, item),
+ None => panic!("start_send called without poll_ready being called first"),
+ };
+ this.state.set(UnfoldState::Future { future });
+ Ok(())
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ let mut this = self.project();
+ Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
+ match ready!(future.poll(cx)) {
+ Ok(state) => {
+ this.state.set(UnfoldState::Value { value: state });
+ Ok(())
+ }
+ Err(err) => Err(err),
+ }
+ } else {
+ Ok(())
+ })
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_flush(cx)
+ }
+}
diff --git a/src/sink/with.rs b/src/sink/with.rs
index 6329a0c..73b87b7 100644
--- a/src/sink/with.rs
+++ b/src/sink/with.rs
@@ -2,21 +2,23 @@ use core::fmt;
use core::marker::PhantomData;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Sink for the [`with`](super::SinkExt::with) method.
-#[pin_project]
-#[must_use = "sinks do nothing unless polled"]
-pub struct With<Si, Item, U, Fut, F> {
- #[pin]
- sink: Si,
- f: F,
- #[pin]
- state: Option<Fut>,
- _phantom: PhantomData<fn(U) -> Item>,
+pin_project! {
+ /// Sink for the [`with`](super::SinkExt::with) method.
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct With<Si, Item, U, Fut, F> {
+ #[pin]
+ sink: Si,
+ f: F,
+ #[pin]
+ state: Option<Fut>,
+ _phantom: PhantomData<fn(U) -> Item>,
+ }
}
impl<Si, Item, U, Fut, F> fmt::Debug for With<Si, Item, U, Fut, F>
@@ -42,7 +44,7 @@ where Si: Sink<Item>,
Fut: Future<Output = Result<Item, E>>,
E: From<Si::Error>,
{
- With {
+ Self {
state: None,
sink,
f,
@@ -51,6 +53,22 @@ where Si: Sink<Item>,
}
}
+impl<Si, Item, U, Fut, F> Clone for With<Si, Item, U, Fut, F>
+where
+ Si: Clone,
+ F: Clone,
+ Fut: Clone,
+{
+ fn clone(&self) -> Self {
+ Self {
+ state: self.state.clone(),
+ sink: self.sink.clone(),
+ f: self.f.clone(),
+ _phantom: PhantomData,
+ }
+ }
+}
+
// Forwarding impl of Stream from the underlying sink
impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F>
where S: Stream + Sink<Item>,
diff --git a/src/sink/with_flat_map.rs b/src/sink/with_flat_map.rs
index cf213e6..4b8d3a2 100644
--- a/src/sink/with_flat_map.rs
+++ b/src/sink/with_flat_map.rs
@@ -1,22 +1,24 @@
use core::fmt;
use core::marker::PhantomData;
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method.
-#[pin_project]
-#[must_use = "sinks do nothing unless polled"]
-pub struct WithFlatMap<Si, Item, U, St, F> {
- #[pin]
- sink: Si,
- f: F,
- #[pin]
- stream: Option<St>,
- buffer: Option<Item>,
- _marker: PhantomData<fn(U)>,
+pin_project! {
+ /// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method.
+ #[must_use = "sinks do nothing unless polled"]
+ pub struct WithFlatMap<Si, Item, U, St, F> {
+ #[pin]
+ sink: Si,
+ f: F,
+ #[pin]
+ stream: Option<St>,
+ buffer: Option<Item>,
+ _marker: PhantomData<fn(U)>,
+ }
}
impl<Si, Item, U, St, F> fmt::Debug for WithFlatMap<Si, Item, U, St, F>
@@ -41,7 +43,7 @@ where
St: Stream<Item = Result<Item, Si::Error>>,
{
pub(super) fn new(sink: Si, f: F) -> Self {
- WithFlatMap {
+ Self {
sink,
f,
stream: None,
diff --git a/src/stream/empty.rs b/src/stream/empty.rs
index 903af68..d228b31 100644
--- a/src/stream/empty.rs
+++ b/src/stream/empty.rs
@@ -38,3 +38,9 @@ impl<T> Stream for Empty<T> {
(0, Some(0))
}
}
+
+impl<T> Clone for Empty<T> {
+ fn clone(&self) -> Self {
+ empty()
+ }
+}
diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs
index 5dbd4ae..eda3b27 100644
--- a/src/stream/futures_ordered.rs
+++ b/src/stream/futures_ordered.rs
@@ -1,21 +1,26 @@
use crate::stream::{FuturesUnordered, StreamExt};
-use futures_core::future::Future;
-use futures_core::stream::Stream;
-use futures_core::{FusedStream, task::{Context, Poll}};
-use pin_project::pin_project;
+use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
use core::cmp::Ordering;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::pin::Pin;
-use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
-
-#[pin_project]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-#[derive(Debug)]
-struct OrderWrapper<T> {
- #[pin]
- data: T, // A future or a future's output
- index: usize,
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::stream::Stream;
+use futures_core::{
+ task::{Context, Poll},
+ FusedStream,
+};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ #[derive(Debug)]
+ struct OrderWrapper<T> {
+ #[pin]
+ data: T, // A future or a future's output
+ index: usize,
+ }
}
impl<T> PartialEq for OrderWrapper<T> {
@@ -40,17 +45,17 @@ impl<T> Ord for OrderWrapper<T> {
}
impl<T> Future for OrderWrapper<T>
- where T: Future
+where
+ T: Future,
{
type Output = OrderWrapper<T::Output>;
- fn poll(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Self::Output> {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let index = self.index;
- self.project().data.poll(cx)
- .map(|output| OrderWrapper { data: output, index })
+ self.project().data.poll(cx).map(|output| OrderWrapper {
+ data: output,
+ index,
+ })
}
}
@@ -104,8 +109,8 @@ impl<Fut: Future> FuturesOrdered<Fut> {
///
/// The returned `FuturesOrdered` does not contain any futures and, in this
/// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`.
- pub fn new() -> FuturesOrdered<Fut> {
- FuturesOrdered {
+ pub fn new() -> Self {
+ Self {
in_progress_queue: FuturesUnordered::new(),
queued_outputs: BinaryHeap::new(),
next_incoming_index: 0,
@@ -144,18 +149,15 @@ impl<Fut: Future> FuturesOrdered<Fut> {
}
impl<Fut: Future> Default for FuturesOrdered<Fut> {
- fn default() -> FuturesOrdered<Fut> {
- FuturesOrdered::new()
+ fn default() -> Self {
+ Self::new()
}
}
impl<Fut: Future> Stream for FuturesOrdered<Fut> {
type Item = Fut::Output;
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>
- ) -> Poll<Option<Self::Item>> {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
// Check to see if we've already received the next value
@@ -198,8 +200,11 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
where
T: IntoIterator<Item = Fut>,
{
- let acc = FuturesOrdered::new();
- iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc })
+ let acc = Self::new();
+ iter.into_iter().fold(acc, |mut acc, item| {
+ acc.push(item);
+ acc
+ })
}
}
@@ -214,7 +219,7 @@ impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
where
I: IntoIterator<Item = Fut>,
{
- for item in iter.into_iter() {
+ for item in iter {
self.push(item);
}
}
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs
index 2b7d704..37b7d7e 100644
--- a/src/stream/futures_unordered/mod.rs
+++ b/src/stream/futures_unordered/mod.rs
@@ -122,8 +122,8 @@ impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> {
// run queue if it isn't inserted already.
impl<Fut> Default for FuturesUnordered<Fut> {
- fn default() -> FuturesUnordered<Fut> {
- FuturesUnordered::new()
+ fn default() -> Self {
+ Self::new()
}
}
@@ -133,7 +133,7 @@ impl<Fut> FuturesUnordered<Fut> {
/// The returned [`FuturesUnordered`] does not contain any futures.
/// In this state, [`FuturesUnordered::poll_next`](Stream::poll_next) will
/// return [`Poll::Ready(None)`](Poll::Ready).
- pub fn new() -> FuturesUnordered<Fut> {
+ pub fn new() -> Self {
let stub = Arc::new(Task {
future: UnsafeCell::new(None),
next_all: AtomicPtr::new(ptr::null_mut()),
@@ -151,7 +151,7 @@ impl<Fut> FuturesUnordered<Fut> {
stub,
});
- FuturesUnordered {
+ Self {
head_all: AtomicPtr::new(ptr::null_mut()),
ready_to_run_queue,
is_terminated: AtomicBool::new(false),
@@ -610,7 +610,7 @@ impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> {
where
I: IntoIterator<Item = Fut>,
{
- let acc = FuturesUnordered::new();
+ let acc = Self::new();
iter.into_iter().fold(acc, |acc, item| { acc.push(item); acc })
}
}
@@ -626,7 +626,7 @@ impl<Fut> Extend<Fut> for FuturesUnordered<Fut> {
where
I: IntoIterator<Item = Fut>,
{
- for item in iter.into_iter() {
+ for item in iter {
self.push(item);
}
}
diff --git a/src/stream/futures_unordered/task.rs b/src/stream/futures_unordered/task.rs
index abb0264..261408f 100644
--- a/src/stream/futures_unordered/task.rs
+++ b/src/stream/futures_unordered/task.rs
@@ -70,7 +70,7 @@ impl<Fut> ArcWake for Task<Fut> {
impl<Fut> Task<Fut> {
/// Returns a waker reference for this task without cloning the Arc.
- pub(super) fn waker_ref(this: &Arc<Task<Fut>>) -> WakerRef<'_> {
+ pub(super) fn waker_ref(this: &Arc<Self>) -> WakerRef<'_> {
waker_ref(this)
}
diff --git a/src/stream/iter.rs b/src/stream/iter.rs
index e9df81c..cab8cd8 100644
--- a/src/stream/iter.rs
+++ b/src/stream/iter.rs
@@ -3,7 +3,7 @@ use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
/// Stream for the [`iter`] function.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
#[must_use = "streams do nothing unless polled"]
pub struct Iter<I> {
iter: I,
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index ca9bc89..a5624ba 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -1,8 +1,13 @@
-//! Streams
+//! Asynchronous streams.
//!
-//! This module contains a number of functions for working with `Stream`s,
-//! including the [`StreamExt`] trait and the [`TryStreamExt`] trait which add
-//! methods to `Stream` types
+//! This module contains:
+//!
+//! - The [`Stream`] trait, for objects that can asynchronously produce a
+//! sequence of values.
+//! - The [`StreamExt`] and [`TryStreamExt`] trait, which provides adapters for
+//! chaining and composing streams.
+//! - Top-level stream constructors like [`iter`](iter()) which creates a
+//! stream from an iterator.
#[cfg(feature = "alloc")]
pub use futures_core::stream::{BoxStream, LocalBoxStream};
@@ -13,9 +18,9 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
#[allow(clippy::module_inception)]
mod stream;
pub use self::stream::{
- Chain, Collect, Concat, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, Fuse,
- Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
- StreamFuture, Take, TakeWhile, TakeUntil, Then, Zip,
+ Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
+ Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
+ StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
};
#[cfg(feature = "std")]
@@ -55,7 +60,7 @@ pub use self::try_stream::IntoAsyncRead;
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
-pub use self::try_stream::{TryBufferUnordered, TryForEachConcurrent};
+pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent};
// Primitive streams
@@ -65,6 +70,9 @@ pub use self::iter::{iter, Iter};
mod repeat;
pub use self::repeat::{repeat, Repeat};
+mod repeat_with;
+pub use self::repeat_with::{repeat_with, RepeatWith};
+
mod empty;
pub use self::empty::{empty, Empty};
diff --git a/src/stream/once.rs b/src/stream/once.rs
index 3a8fef6..318de07 100644
--- a/src/stream/once.rs
+++ b/src/stream/once.rs
@@ -1,8 +1,9 @@
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
/// Creates a stream of a single element.
///
@@ -19,13 +20,14 @@ pub fn once<Fut: Future>(future: Fut) -> Once<Fut> {
Once::new(future)
}
-/// A stream which emits single element and then EOF.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Once<Fut> {
- #[pin]
- future: Option<Fut>
+pin_project! {
+ /// A stream which emits single element and then EOF.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Once<Fut> {
+ #[pin]
+ future: Option<Fut>
+ }
}
impl<Fut> Once<Fut> {
diff --git a/src/stream/pending.rs b/src/stream/pending.rs
index bbe0750..ca793c1 100644
--- a/src/stream/pending.rs
+++ b/src/stream/pending.rs
@@ -36,3 +36,9 @@ impl<T> Stream for Pending<T> {
(0, Some(0))
}
}
+
+impl<T> Clone for Pending<T> {
+ fn clone(&self) -> Self {
+ pending()
+ }
+}
diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs
index 21749eb..6a2637d 100644
--- a/src/stream/repeat.rs
+++ b/src/stream/repeat.rs
@@ -3,7 +3,7 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
/// Stream for the [`repeat`] function.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
#[must_use = "streams do nothing unless polled"]
pub struct Repeat<T> {
item: T,
diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs
new file mode 100644
index 0000000..eb3313d
--- /dev/null
+++ b/src/stream/repeat_with.rs
@@ -0,0 +1,93 @@
+use core::pin::Pin;
+use futures_core::stream::{Stream, FusedStream};
+use futures_core::task::{Context, Poll};
+
+/// An stream that repeats elements of type `A` endlessly by
+/// applying the provided closure `F: FnMut() -> A`.
+///
+/// This `struct` is created by the [`repeat_with()`] function.
+/// See its documentation for more.
+#[derive(Debug, Clone)]
+#[must_use = "streams do nothing unless polled"]
+pub struct RepeatWith<F> {
+ repeater: F,
+}
+
+impl<A, F: FnMut() -> A> Unpin for RepeatWith<F> {}
+
+impl<A, F: FnMut() -> A> Stream for RepeatWith<F> {
+ type Item = A;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ Poll::Ready(Some((&mut self.repeater)()))
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (usize::max_value(), None)
+ }
+}
+
+impl<A, F: FnMut() -> A> FusedStream for RepeatWith<F>
+{
+ fn is_terminated(&self) -> bool {
+ false
+ }
+}
+
+/// Creates a new stream that repeats elements of type `A` endlessly by
+/// applying the provided closure, the repeater, `F: FnMut() -> A`.
+///
+/// The `repeat_with()` function calls the repeater over and over again.
+///
+/// Infinite stream like `repeat_with()` are often used with adapters like
+/// [`stream.take()`], in order to make them finite.
+///
+/// If the element type of the stream you need implements [`Clone`], and
+/// it is OK to keep the source element in memory, you should instead use
+/// the [`stream.repeat()`] function.
+///
+/// # Examples
+///
+/// Basic usage:
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::stream::{self, StreamExt};
+///
+/// // let's assume we have some value of a type that is not `Clone`
+/// // or which don't want to have in memory just yet because it is expensive:
+/// #[derive(PartialEq, Debug)]
+/// struct Expensive;
+///
+/// // a particular value forever:
+/// let mut things = stream::repeat_with(|| Expensive);
+///
+/// assert_eq!(Some(Expensive), things.next().await);
+/// assert_eq!(Some(Expensive), things.next().await);
+/// assert_eq!(Some(Expensive), things.next().await);
+/// # });
+/// ```
+///
+/// Using mutation and going finite:
+///
+/// ```rust
+/// # futures::executor::block_on(async {
+/// use futures::stream::{self, StreamExt};
+///
+/// // From the zeroth to the third power of two:
+/// let mut curr = 1;
+/// let mut pow2 = stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp })
+/// .take(4);
+///
+/// assert_eq!(Some(1), pow2.next().await);
+/// assert_eq!(Some(2), pow2.next().await);
+/// assert_eq!(Some(4), pow2.next().await);
+/// assert_eq!(Some(8), pow2.next().await);
+///
+/// // ... and now we're done
+/// assert_eq!(None, pow2.next().await);
+/// # });
+/// ```
+pub fn repeat_with<A, F: FnMut() -> A>(repeater: F) -> RepeatWith<F> {
+ RepeatWith { repeater }
+}
diff --git a/src/stream/select.rs b/src/stream/select.rs
index 7666386..2b7ebec 100644
--- a/src/stream/select.rs
+++ b/src/stream/select.rs
@@ -2,18 +2,19 @@ use crate::stream::{StreamExt, Fuse};
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`select()`] function.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Select<St1, St2> {
- #[pin]
- stream1: Fuse<St1>,
- #[pin]
- stream2: Fuse<St2>,
- flag: bool,
+pin_project! {
+ /// Stream for the [`select()`] function.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Select<St1, St2> {
+ #[pin]
+ stream1: Fuse<St1>,
+ #[pin]
+ stream2: Fuse<St2>,
+ flag: bool,
+ }
}
/// This function will attempt to pull items from both streams. Each
diff --git a/src/stream/select_all.rs b/src/stream/select_all.rs
index 2547993..00368bb 100644
--- a/src/stream/select_all.rs
+++ b/src/stream/select_all.rs
@@ -4,6 +4,7 @@ use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
@@ -37,8 +38,8 @@ impl<St: Stream + Unpin> SelectAll<St> {
///
/// The returned `SelectAll` does not contain any streams and, in this
/// state, `SelectAll::poll` will return `Poll::Ready(None)`.
- pub fn new() -> SelectAll<St> {
- SelectAll { inner: FuturesUnordered::new() }
+ pub fn new() -> Self {
+ Self { inner: FuturesUnordered::new() }
}
/// Returns the number of streams contained in the set.
@@ -65,8 +66,8 @@ impl<St: Stream + Unpin> SelectAll<St> {
}
impl<St: Stream + Unpin> Default for SelectAll<St> {
- fn default() -> SelectAll<St> {
- SelectAll::new()
+ fn default() -> Self {
+ Self::new()
}
}
diff --git a/src/stream/stream/buffer_unordered.rs b/src/stream/stream/buffer_unordered.rs
index 24f4853..de42cfd 100644
--- a/src/stream/stream/buffer_unordered.rs
+++ b/src/stream/stream/buffer_unordered.rs
@@ -4,22 +4,23 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use core::fmt;
use core::pin::Pin;
-/// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
-/// method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct BufferUnordered<St>
-where
- St: Stream,
-{
- #[pin]
- stream: Fuse<St>,
- in_progress_queue: FuturesUnordered<St::Item>,
- max: usize,
+pin_project! {
+ /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
+ /// method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct BufferUnordered<St>
+ where
+ St: Stream,
+ {
+ #[pin]
+ stream: Fuse<St>,
+ in_progress_queue: FuturesUnordered<St::Item>,
+ max: usize,
+ }
}
impl<St> fmt::Debug for BufferUnordered<St>
@@ -40,12 +41,12 @@ where
St: Stream,
St::Item: Future,
{
- pub(super) fn new(stream: St, n: usize) -> BufferUnordered<St>
+ pub(super) fn new(stream: St, n: usize) -> Self
where
St: Stream,
St::Item: Future,
{
- BufferUnordered {
+ Self {
stream: super::Fuse::new(stream),
in_progress_queue: FuturesUnordered::new(),
max: n,
diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs
index 626ead1..1af9f49 100644
--- a/src/stream/stream/buffered.rs
+++ b/src/stream/stream/buffered.rs
@@ -1,25 +1,27 @@
use crate::stream::{Fuse, FuturesOrdered, StreamExt};
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use core::fmt;
use core::pin::Pin;
-/// Stream for the [`buffered`](super::StreamExt::buffered) method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct Buffered<St>
-where
- St: Stream,
- St::Item: Future,
-{
- #[pin]
- stream: Fuse<St>,
- in_progress_queue: FuturesOrdered<St::Item>,
- max: usize,
+pin_project! {
+ /// Stream for the [`buffered`](super::StreamExt::buffered) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Buffered<St>
+ where
+ St: Stream,
+ St::Item: Future,
+ {
+ #[pin]
+ stream: Fuse<St>,
+ in_progress_queue: FuturesOrdered<St::Item>,
+ max: usize,
+ }
}
impl<St> fmt::Debug for Buffered<St>
@@ -41,8 +43,8 @@ where
St: Stream,
St::Item: Future,
{
- pub(super) fn new(stream: St, n: usize) -> Buffered<St> {
- Buffered {
+ pub(super) fn new(stream: St, n: usize) -> Self {
+ Self {
stream: super::Fuse::new(stream),
in_progress_queue: FuturesOrdered::new(),
max: n,
diff --git a/src/stream/stream/catch_unwind.rs b/src/stream/stream/catch_unwind.rs
index b9eb4ba..d87a40a 100644
--- a/src/stream/stream/catch_unwind.rs
+++ b/src/stream/stream/catch_unwind.rs
@@ -1,23 +1,24 @@
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use std::any::Any;
use std::pin::Pin;
use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
-/// Stream for the [`catch_unwind`](super::StreamExt::catch_unwind) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct CatchUnwind<St> {
- #[pin]
- stream: St,
- caught_unwind: bool,
+pin_project! {
+ /// Stream for the [`catch_unwind`](super::StreamExt::catch_unwind) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct CatchUnwind<St> {
+ #[pin]
+ stream: St,
+ caught_unwind: bool,
+ }
}
impl<St: Stream + UnwindSafe> CatchUnwind<St> {
- pub(super) fn new(stream: St) -> CatchUnwind<St> {
- CatchUnwind { stream, caught_unwind: false }
+ pub(super) fn new(stream: St) -> Self {
+ Self { stream, caught_unwind: false }
}
delegate_access_inner!(stream, St, ());
diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs
index c7fbd5f..2be7104 100644
--- a/src/stream/stream/chain.rs
+++ b/src/stream/stream/chain.rs
@@ -1,17 +1,19 @@
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`chain`](super::StreamExt::chain) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Chain<St1, St2> {
- #[pin]
- first: Option<St1>,
- #[pin]
- second: St2,
+pin_project! {
+ /// Stream for the [`chain`](super::StreamExt::chain) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Chain<St1, St2> {
+ #[pin]
+ first: Option<St1>,
+ #[pin]
+ second: St2,
+ }
}
// All interactions with `Pin<&mut Chain<..>>` happen through these methods
@@ -19,8 +21,8 @@ impl<St1, St2> Chain<St1, St2>
where St1: Stream,
St2: Stream<Item = St1::Item>,
{
- pub(super) fn new(stream1: St1, stream2: St2) -> Chain<St1, St2> {
- Chain {
+ pub(super) fn new(stream1: St1, stream2: St2) -> Self {
+ Self {
first: Some(stream1),
second: stream2,
}
diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs
index 9b4ed93..45a3212 100644
--- a/src/stream/stream/chunks.rs
+++ b/src/stream/stream/chunks.rs
@@ -1,29 +1,31 @@
use crate::stream::Fuse;
+use futures_core::ready;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use core::mem;
use core::pin::Pin;
use alloc::vec::Vec;
-/// Stream for the [`chunks`](super::StreamExt::chunks) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Chunks<St: Stream> {
- #[pin]
- stream: Fuse<St>,
- items: Vec<St::Item>,
- cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+pin_project! {
+ /// Stream for the [`chunks`](super::StreamExt::chunks) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Chunks<St: Stream> {
+ #[pin]
+ stream: Fuse<St>,
+ items: Vec<St::Item>,
+ cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+ }
}
impl<St: Stream> Chunks<St> where St: Stream {
- pub(super) fn new(stream: St, capacity: usize) -> Chunks<St> {
+ pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);
- Chunks {
+ Self {
stream: super::Fuse::new(stream),
items: Vec::with_capacity(capacity),
cap: capacity,
diff --git a/src/stream/stream/collect.rs b/src/stream/stream/collect.rs
index 6d07660..774b34b 100644
--- a/src/stream/stream/collect.rs
+++ b/src/stream/stream/collect.rs
@@ -1,18 +1,20 @@
use core::mem;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`collect`](super::StreamExt::collect) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct Collect<St, C> {
- #[pin]
- stream: St,
- collection: C,
+pin_project! {
+ /// Future for the [`collect`](super::StreamExt::collect) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Collect<St, C> {
+ #[pin]
+ stream: St,
+ collection: C,
+ }
}
impl<St: Stream, C: Default> Collect<St, C> {
@@ -20,8 +22,8 @@ impl<St: Stream, C: Default> Collect<St, C> {
mem::replace(self.project().collection, Default::default())
}
- pub(super) fn new(stream: St) -> Collect<St, C> {
- Collect {
+ pub(super) fn new(stream: St) -> Self {
+ Self {
stream,
collection: Default::default(),
}
diff --git a/src/stream/stream/concat.rs b/src/stream/stream/concat.rs
index 9b37cd2..ee1349f 100644
--- a/src/stream/stream/concat.rs
+++ b/src/stream/stream/concat.rs
@@ -1,17 +1,19 @@
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
+use futures_core::ready;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`concat`](super::StreamExt::concat) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct Concat<St: Stream> {
- #[pin]
- stream: St,
- accum: Option<St::Item>,
+pin_project! {
+ /// Future for the [`concat`](super::StreamExt::concat) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Concat<St: Stream> {
+ #[pin]
+ stream: St,
+ accum: Option<St::Item>,
+ }
}
impl<St> Concat<St>
@@ -19,8 +21,8 @@ where St: Stream,
St::Item: Extend<<St::Item as IntoIterator>::Item> +
IntoIterator + Default,
{
- pub(super) fn new(stream: St) -> Concat<St> {
- Concat {
+ pub(super) fn new(stream: St) -> Self {
+ Self {
stream,
accum: None,
}
diff --git a/src/stream/stream/cycle.rs b/src/stream/stream/cycle.rs
new file mode 100644
index 0000000..a5b7dc0
--- /dev/null
+++ b/src/stream/stream/cycle.rs
@@ -0,0 +1,71 @@
+use core::pin::Pin;
+use core::usize;
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`cycle`](super::StreamExt::cycle) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Cycle<St> {
+ orig: St,
+ #[pin]
+ stream: St,
+ }
+}
+
+impl<St> Cycle<St>
+where
+ St: Clone + Stream,
+{
+ pub(super) fn new(stream: St) -> Self {
+ Self {
+ orig: stream.clone(),
+ stream,
+ }
+ }
+}
+
+impl<St> Stream for Cycle<St>
+where
+ St: Clone + Stream,
+{
+ type Item = St::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ match ready!(this.stream.as_mut().poll_next(cx)) {
+ None => {
+ this.stream.set(this.orig.clone());
+ this.stream.poll_next(cx)
+ }
+ item => Poll::Ready(item),
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ // the cycle stream is either empty or infinite
+ match self.orig.size_hint() {
+ size @ (0, Some(0)) => size,
+ (0, _) => (0, None),
+ _ => (usize::max_value(), None),
+ }
+ }
+}
+
+impl<St> FusedStream for Cycle<St>
+where
+ St: Clone + Stream,
+{
+ fn is_terminated(&self) -> bool {
+ // the cycle stream is either empty or infinite
+ if let (0, Some(0)) = self.size_hint() {
+ true
+ } else {
+ false
+ }
+ }
+}
diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs
index 4e6bac2..7d4c9cb 100644
--- a/src/stream/stream/enumerate.rs
+++ b/src/stream/stream/enumerate.rs
@@ -1,23 +1,25 @@
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`enumerate`](super::StreamExt::enumerate) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Enumerate<St> {
- #[pin]
- stream: St,
- count: usize,
+pin_project! {
+ /// Stream for the [`enumerate`](super::StreamExt::enumerate) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Enumerate<St> {
+ #[pin]
+ stream: St,
+ count: usize,
+ }
}
impl<St: Stream> Enumerate<St> {
- pub(super) fn new(stream: St) -> Enumerate<St> {
- Enumerate {
+ pub(super) fn new(stream: St) -> Self {
+ Self {
stream,
count: 0,
}
diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs
index 55493fe..57de025 100644
--- a/src/stream/stream/filter.rs
+++ b/src/stream/stream/filter.rs
@@ -1,25 +1,27 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use crate::fns::FnMut1;
-/// Stream for the [`filter`](super::StreamExt::filter) method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct Filter<St, Fut, F>
- where St: Stream,
-{
- #[pin]
- stream: St,
- f: F,
- #[pin]
- pending_fut: Option<Fut>,
- pending_item: Option<St::Item>,
+pin_project! {
+ /// Stream for the [`filter`](super::StreamExt::filter) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Filter<St, Fut, F>
+ where St: Stream,
+ {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending_fut: Option<Fut>,
+ pending_item: Option<St::Item>,
+ }
}
impl<St, Fut, F> fmt::Debug for Filter<St, Fut, F>
@@ -43,8 +45,8 @@ where St: Stream,
F: for<'a> FnMut1<&'a St::Item, Output=Fut>,
Fut: Future<Output = bool>,
{
- pub(super) fn new(stream: St, f: F) -> Filter<St, Fut, F> {
- Filter {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self {
stream,
f,
pending_fut: None,
diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs
index 50c440f..b762fac 100644
--- a/src/stream/stream/filter_map.rs
+++ b/src/stream/stream/filter_map.rs
@@ -1,22 +1,24 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use crate::fns::FnMut1;
-/// Stream for the [`filter_map`](super::StreamExt::filter_map) method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct FilterMap<St, Fut, F> {
- #[pin]
- stream: St,
- f: F,
- #[pin]
- pending: Option<Fut>,
+pin_project! {
+ /// Stream for the [`filter_map`](super::StreamExt::filter_map) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct FilterMap<St, Fut, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending: Option<Fut>,
+ }
}
impl<St, Fut, F> fmt::Debug for FilterMap<St, Fut, F>
@@ -37,8 +39,8 @@ impl<St, Fut, F> FilterMap<St, Fut, F>
F: FnMut(St::Item) -> Fut,
Fut: Future,
{
- pub(super) fn new(stream: St, f: F) -> FilterMap<St, Fut, F> {
- FilterMap { stream, f, pending: None }
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self { stream, f, pending: None }
}
delegate_access_inner!(stream, St, ());
diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs
index 75bbc21..9f6b7a4 100644
--- a/src/stream/stream/flatten.rs
+++ b/src/stream/stream/flatten.rs
@@ -1,19 +1,21 @@
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`flatten`](super::StreamExt::flatten) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Flatten<St, U> {
- #[pin]
- stream: St,
- #[pin]
- next: Option<U>,
+pin_project! {
+ /// Stream for the [`flatten`](super::StreamExt::flatten) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Flatten<St, U> {
+ #[pin]
+ stream: St,
+ #[pin]
+ next: Option<U>,
+ }
}
impl<St, U> Flatten<St, U> {
diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs
index 6fce256..e109c3b 100644
--- a/src/stream/stream/fold.rs
+++ b/src/stream/stream/fold.rs
@@ -1,20 +1,22 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`fold`](super::StreamExt::fold) method.
-#[pin_project]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct Fold<St, Fut, T, F> {
- #[pin]
- stream: St,
- f: F,
- accum: Option<T>,
- #[pin]
- future: Option<Fut>,
+pin_project! {
+ /// Future for the [`fold`](super::StreamExt::fold) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Fold<St, Fut, T, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ accum: Option<T>,
+ #[pin]
+ future: Option<Fut>,
+ }
}
impl<St, Fut, T, F> fmt::Debug for Fold<St, Fut, T, F>
@@ -37,8 +39,8 @@ where St: Stream,
F: FnMut(T, St::Item) -> Fut,
Fut: Future<Output = T>,
{
- pub(super) fn new(stream: St, f: F, t: T) -> Fold<St, Fut, T, F> {
- Fold {
+ pub(super) fn new(stream: St, f: F, t: T) -> Self {
+ Self {
stream,
f,
accum: Some(t),
diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs
index c8af21b..ee90e66 100644
--- a/src/stream/stream/for_each.rs
+++ b/src/stream/stream/for_each.rs
@@ -1,19 +1,21 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`for_each`](super::StreamExt::for_each) method.
-#[pin_project]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct ForEach<St, Fut, F> {
- #[pin]
- stream: St,
- f: F,
- #[pin]
- future: Option<Fut>,
+pin_project! {
+ /// Future for the [`for_each`](super::StreamExt::for_each) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct ForEach<St, Fut, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ future: Option<Fut>,
+ }
}
impl<St, Fut, F> fmt::Debug for ForEach<St, Fut, F>
@@ -34,8 +36,8 @@ where St: Stream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = ()>,
{
- pub(super) fn new(stream: St, f: F) -> ForEach<St, Fut, F> {
- ForEach {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self {
stream,
f,
future: None,
diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs
index 843ddaa..cee0ba1 100644
--- a/src/stream/stream/for_each_concurrent.rs
+++ b/src/stream/stream/for_each_concurrent.rs
@@ -5,18 +5,19 @@ use core::num::NonZeroUsize;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent)
-/// method.
-#[pin_project]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct ForEachConcurrent<St, Fut, F> {
- #[pin]
- stream: Option<St>,
- f: F,
- futures: FuturesUnordered<Fut>,
- limit: Option<NonZeroUsize>,
+pin_project! {
+ /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent)
+ /// method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct ForEachConcurrent<St, Fut, F> {
+ #[pin]
+ stream: Option<St>,
+ f: F,
+ futures: FuturesUnordered<Fut>,
+ limit: Option<NonZeroUsize>,
+ }
}
impl<St, Fut, F> fmt::Debug for ForEachConcurrent<St, Fut, F>
@@ -38,8 +39,8 @@ where St: Stream,
F: FnMut(St::Item) -> Fut,
Fut: Future<Output = ()>,
{
- pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> ForEachConcurrent<St, Fut, F> {
- ForEachConcurrent {
+ pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self {
+ Self {
stream: Some(stream),
// Note: `limit` = 0 gets ignored.
limit: limit.and_then(NonZeroUsize::new),
diff --git a/src/stream/stream/forward.rs b/src/stream/stream/forward.rs
index feef113..2247b21 100644
--- a/src/stream/stream/forward.rs
+++ b/src/stream/stream/forward.rs
@@ -1,26 +1,29 @@
use crate::stream::Fuse;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`forward`](super::StreamExt::forward) method.
-#[pin_project(project = ForwardProj)]
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct Forward<St, Si, Item> {
- #[pin]
- sink: Option<Si>,
- #[pin]
- stream: Fuse<St>,
- buffered_item: Option<Item>,
+pin_project! {
+ /// Future for the [`forward`](super::StreamExt::forward) method.
+ #[project = ForwardProj]
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Forward<St, Si, Item> {
+ #[pin]
+ sink: Option<Si>,
+ #[pin]
+ stream: Fuse<St>,
+ buffered_item: Option<Item>,
+ }
}
impl<St, Si, Item> Forward<St, Si, Item> {
pub(crate) fn new(stream: St, sink: Si) -> Self {
- Forward {
+ Self {
sink: Some(sink),
stream: Fuse::new(stream),
buffered_item: None,
diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs
index 408ad81..e1d8c12 100644
--- a/src/stream/stream/fuse.rs
+++ b/src/stream/stream/fuse.rs
@@ -1,23 +1,25 @@
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`fuse`](super::StreamExt::fuse) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Fuse<St> {
- #[pin]
- stream: St,
- done: bool,
+pin_project! {
+ /// Stream for the [`fuse`](super::StreamExt::fuse) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Fuse<St> {
+ #[pin]
+ stream: St,
+ done: bool,
+ }
}
impl<St> Fuse<St> {
- pub(super) fn new(stream: St) -> Fuse<St> {
- Fuse { stream, done: false }
+ pub(super) fn new(stream: St) -> Self {
+ Self { stream, done: false }
}
/// Returns whether the underlying stream has finished or not.
diff --git a/src/stream/stream/into_future.rs b/src/stream/stream/into_future.rs
index 8aa2b1e..a9a1e23 100644
--- a/src/stream/stream/into_future.rs
+++ b/src/stream/stream/into_future.rs
@@ -1,6 +1,7 @@
use crate::stream::StreamExt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
@@ -12,8 +13,8 @@ pub struct StreamFuture<St> {
}
impl<St: Stream + Unpin> StreamFuture<St> {
- pub(super) fn new(stream: St) -> StreamFuture<St> {
- StreamFuture { stream: Some(stream) }
+ pub(super) fn new(stream: St) -> Self {
+ Self { stream: Some(stream) }
}
/// Acquires a reference to the underlying stream that this combinator is
diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs
index c877512..1a269f0 100644
--- a/src/stream/stream/map.rs
+++ b/src/stream/stream/map.rs
@@ -1,20 +1,22 @@
use core::fmt;
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use crate::fns::FnMut1;
-/// Stream for the [`map`](super::StreamExt::map) method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct Map<St, F> {
- #[pin]
- stream: St,
- f: F,
+pin_project! {
+ /// Stream for the [`map`](super::StreamExt::map) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Map<St, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ }
}
impl<St, F> fmt::Debug for Map<St, F>
@@ -29,8 +31,8 @@ where
}
impl<St, F> Map<St, F> {
- pub(crate) fn new(stream: St, f: F) -> Map<St, F> {
- Map { stream, f }
+ pub(crate) fn new(stream: St, f: F) -> Self {
+ Self { stream, f }
}
delegate_access_inner!(stream, St, ());
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index f988468..b1b4384 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -29,10 +29,18 @@ mod collect;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::collect::Collect;
+mod unzip;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::unzip::Unzip;
+
mod concat;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::concat::Concat;
+mod cycle;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::cycle::Cycle;
+
mod enumerate;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::enumerate::Enumerate;
@@ -473,6 +481,45 @@ pub trait StreamExt: Stream {
assert_future::<C, _>(Collect::new(self))
}
+ /// Converts a stream of pairs into a future, which
+ /// resolves to pair of containers.
+ ///
+ /// `unzip()` produces a future, which resolves to two
+ /// collections: one from the left elements of the pairs,
+ /// and one from the right elements.
+ ///
+ /// The returned future will be resolved when the stream terminates.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::StreamExt;
+ /// use std::thread;
+ ///
+ /// let (tx, rx) = mpsc::unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// tx.unbounded_send((1, 2)).unwrap();
+ /// tx.unbounded_send((3, 4)).unwrap();
+ /// tx.unbounded_send((5, 6)).unwrap();
+ /// });
+ ///
+ /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await;
+ /// assert_eq!(o1, vec![1, 3, 5]);
+ /// assert_eq!(o2, vec![2, 4, 6]);
+ /// # });
+ /// ```
+ fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
+ where
+ FromA: Default + Extend<A>,
+ FromB: Default + Extend<B>,
+ Self: Sized + Stream<Item = (A, B)>,
+ {
+ assert_future::<(FromA, FromB), _>(Unzip::new(self))
+ }
+
/// Concatenate all items of a stream into a single extendable
/// destination, returning a future representing the end result.
///
@@ -513,6 +560,36 @@ pub trait StreamExt: Stream {
assert_future::<Self::Item, _>(Concat::new(self))
}
+ /// Repeats a stream endlessly.
+ ///
+ /// The stream never terminates. Note that you likely want to avoid
+ /// usage of `collect` or such on the returned stream as it will exhaust
+ /// available memory as it tries to just fill up all RAM.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ /// let a = [1, 2, 3];
+ /// let mut s = stream::iter(a.iter()).cycle();
+ ///
+ /// assert_eq!(s.next().await, Some(&1));
+ /// assert_eq!(s.next().await, Some(&2));
+ /// assert_eq!(s.next().await, Some(&3));
+ /// assert_eq!(s.next().await, Some(&1));
+ /// assert_eq!(s.next().await, Some(&2));
+ /// assert_eq!(s.next().await, Some(&3));
+ /// assert_eq!(s.next().await, Some(&1));
+ /// # });
+ /// ```
+ fn cycle(self) -> Cycle<Self>
+ where
+ Self: Sized + Clone,
+ {
+ assert_stream::<Self::Item, _>(Cycle::new(self))
+ }
+
/// Execute an accumulating asynchronous computation over a stream,
/// collecting all the values into one final result.
///
diff --git a/src/stream/stream/next.rs b/src/stream/stream/next.rs
index 2d74632..6949878 100644
--- a/src/stream/stream/next.rs
+++ b/src/stream/stream/next.rs
@@ -15,7 +15,7 @@ impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}
impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
pub(super) fn new(stream: &'a mut St) -> Self {
- Next { stream }
+ Self { stream }
}
}
diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs
index 1d8c342..a403110 100644
--- a/src/stream/stream/peek.rs
+++ b/src/stream/stream/peek.rs
@@ -2,29 +2,31 @@ use crate::stream::{Fuse, StreamExt};
use core::fmt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
-
-/// A `Stream` that implements a `peek` method.
-///
-/// The `peek` method can be used to retrieve a reference
-/// to the next `Stream::Item` if available. A subsequent
-/// call to `poll` will return the owned item.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Peekable<St: Stream> {
- #[pin]
- stream: Fuse<St>,
- peeked: Option<St::Item>,
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// A `Stream` that implements a `peek` method.
+ ///
+ /// The `peek` method can be used to retrieve a reference
+ /// to the next `Stream::Item` if available. A subsequent
+ /// call to `poll` will return the owned item.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Peekable<St: Stream> {
+ #[pin]
+ stream: Fuse<St>,
+ peeked: Option<St::Item>,
+ }
}
impl<St: Stream> Peekable<St> {
- pub(super) fn new(stream: St) -> Peekable<St> {
- Peekable {
+ pub(super) fn new(stream: St) -> Self {
+ Self {
stream: stream.fuse(),
peeked: None,
}
@@ -100,11 +102,12 @@ where
delegate_sink!(stream, Item);
}
-/// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`]
-#[pin_project]
-#[must_use = "futures do nothing unless polled"]
-pub struct Peek<'a, St: Stream> {
- inner: Option<Pin<&'a mut Peekable<St>>>,
+pin_project! {
+ /// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`]
+ #[must_use = "futures do nothing unless polled"]
+ pub struct Peek<'a, St: Stream> {
+ inner: Option<Pin<&'a mut Peekable<St>>>,
+ }
}
impl<St> fmt::Debug for Peek<'_, St>
diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs
index 192d3c6..b6e3e5c 100644
--- a/src/stream/stream/ready_chunks.rs
+++ b/src/stream/stream/ready_chunks.rs
@@ -3,27 +3,28 @@ use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use core::mem;
use core::pin::Pin;
use alloc::vec::Vec;
-/// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct ReadyChunks<St: Stream> {
- #[pin]
- stream: Fuse<St>,
- items: Vec<St::Item>,
- cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+pin_project! {
+ /// Stream for the [`ready_chunks`](super::StreamExt::ready_chunks) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct ReadyChunks<St: Stream> {
+ #[pin]
+ stream: Fuse<St>,
+ items: Vec<St::Item>,
+ cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+ }
}
impl<St: Stream> ReadyChunks<St> where St: Stream {
- pub(super) fn new(stream: St, capacity: usize) -> ReadyChunks<St> {
+ pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);
- ReadyChunks {
+ Self {
stream: super::Fuse::new(stream),
items: Vec::with_capacity(capacity),
cap: capacity,
diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs
index dd0316d..2097280 100644
--- a/src/stream/stream/scan.rs
+++ b/src/stream/stream/scan.rs
@@ -1,26 +1,28 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
struct StateFn<S, F> {
state: S,
f: F,
}
-/// Stream for the [`scan`](super::StreamExt::scan) method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct Scan<St: Stream, S, Fut, F> {
- #[pin]
- stream: St,
- state_f: Option<StateFn<S, F>>,
- #[pin]
- future: Option<Fut>,
+pin_project! {
+ /// Stream for the [`scan`](super::StreamExt::scan) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Scan<St: Stream, S, Fut, F> {
+ #[pin]
+ stream: St,
+ state_f: Option<StateFn<S, F>>,
+ #[pin]
+ future: Option<Fut>,
+ }
}
impl<St, S, Fut, F> fmt::Debug for Scan<St, S, Fut, F>
@@ -53,8 +55,8 @@ where
F: FnMut(&mut S, St::Item) -> Fut,
Fut: Future<Output = Option<B>>,
{
- pub(super) fn new(stream: St, initial_state: S, f: F) -> Scan<St, S, Fut, F> {
- Scan {
+ pub(super) fn new(stream: St, initial_state: S, f: F) -> Self {
+ Self {
stream,
state_f: Some(StateFn {
state: initial_state,
diff --git a/src/stream/stream/select_next_some.rs b/src/stream/stream/select_next_some.rs
index 884f252..fe7a089 100644
--- a/src/stream/stream/select_next_some.rs
+++ b/src/stream/stream/select_next_some.rs
@@ -1,4 +1,5 @@
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::FusedStream;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
@@ -14,7 +15,7 @@ pub struct SelectNextSome<'a, St: ?Sized> {
impl<'a, St: ?Sized> SelectNextSome<'a, St> {
pub(super) fn new(stream: &'a mut St) -> Self {
- SelectNextSome { stream }
+ Self { stream }
}
}
diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs
index ea31b17..6ffcf57 100644
--- a/src/stream/stream/skip.rs
+++ b/src/stream/stream/skip.rs
@@ -1,23 +1,25 @@
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`skip`](super::StreamExt::skip) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Skip<St> {
- #[pin]
- stream: St,
- remaining: usize,
+pin_project! {
+ /// Stream for the [`skip`](super::StreamExt::skip) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Skip<St> {
+ #[pin]
+ stream: St,
+ remaining: usize,
+ }
}
impl<St: Stream> Skip<St> {
- pub(super) fn new(stream: St, n: usize) -> Skip<St> {
- Skip {
+ pub(super) fn new(stream: St, n: usize) -> Self {
+ Self {
stream,
remaining: n,
}
diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs
index 2c58102..e1aa3f9 100644
--- a/src/stream/stream/skip_while.rs
+++ b/src/stream/stream/skip_while.rs
@@ -1,23 +1,25 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct SkipWhile<St, Fut, F> where St: Stream {
- #[pin]
- stream: St,
- f: F,
- #[pin]
- pending_fut: Option<Fut>,
- pending_item: Option<St::Item>,
- done_skipping: bool,
+pin_project! {
+ /// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct SkipWhile<St, Fut, F> where St: Stream {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending_fut: Option<Fut>,
+ pending_item: Option<St::Item>,
+ done_skipping: bool,
+ }
}
impl<St, Fut, F> fmt::Debug for SkipWhile<St, Fut, F>
@@ -41,8 +43,8 @@ impl<St, Fut, F> SkipWhile<St, Fut, F>
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
- pub(super) fn new(stream: St, f: F) -> SkipWhile<St, Fut, F> {
- SkipWhile {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self {
stream,
f,
pending_fut: None,
diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs
index 991eb16..997b974 100644
--- a/src/stream/stream/split.rs
+++ b/src/stream/stream/split.rs
@@ -1,3 +1,4 @@
+use futures_core::ready;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs
index 1956a82..124d397 100644
--- a/src/stream/stream/take.rs
+++ b/src/stream/stream/take.rs
@@ -1,24 +1,26 @@
use core::cmp;
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`take`](super::StreamExt::take) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Take<St> {
- #[pin]
- stream: St,
- remaining: usize,
+pin_project! {
+ /// Stream for the [`take`](super::StreamExt::take) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Take<St> {
+ #[pin]
+ stream: St,
+ remaining: usize,
+ }
}
impl<St: Stream> Take<St> {
- pub(super) fn new(stream: St, n: usize) -> Take<St> {
- Take {
+ pub(super) fn new(stream: St, n: usize) -> Self {
+ Self {
stream,
remaining: n,
}
diff --git a/src/stream/stream/take_until.rs b/src/stream/stream/take_until.rs
index 4e32ad8..4dea01a 100644
--- a/src/stream/stream/take_until.rs
+++ b/src/stream/stream/take_until.rs
@@ -1,28 +1,30 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
// FIXME: docs, tests
-/// Stream for the [`take_until`](super::StreamExt::take_until) method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct TakeUntil<St: Stream, Fut: Future> {
- #[pin]
- stream: St,
- /// Contains the inner Future on start and None once the inner Future is resolved
- /// or taken out by the user.
- #[pin]
- fut: Option<Fut>,
- /// Contains fut's return value once fut is resolved
- fut_result: Option<Fut::Output>,
- /// Whether the future was taken out by the user.
- free: bool,
+pin_project! {
+ /// Stream for the [`take_until`](super::StreamExt::take_until) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TakeUntil<St: Stream, Fut: Future> {
+ #[pin]
+ stream: St,
+ // Contains the inner Future on start and None once the inner Future is resolved
+ // or taken out by the user.
+ #[pin]
+ fut: Option<Fut>,
+ // Contains fut's return value once fut is resolved
+ fut_result: Option<Fut::Output>,
+ // Whether the future was taken out by the user.
+ free: bool,
+ }
}
impl<St, Fut> fmt::Debug for TakeUntil<St, Fut>
@@ -44,8 +46,8 @@ where
St: Stream,
Fut: Future,
{
- pub(super) fn new(stream: St, fut: Fut) -> TakeUntil<St, Fut> {
- TakeUntil {
+ pub(super) fn new(stream: St, fut: Fut) -> Self {
+ Self {
stream,
fut: Some(fut),
fut_result: None,
@@ -73,7 +75,7 @@ where
/// This may be used to retrieve arbitrary data from the stopping
/// future, for example a reason why the stream was stopped.
///
- /// This method will return `None` if the future isn't resovled yet,
+ /// This method will return `None` if the future isn't resolved yet,
/// or if the result was already taken out.
///
/// # Examples
diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs
index e3a6d00..4cdba83 100644
--- a/src/stream/stream/take_while.rs
+++ b/src/stream/stream/take_while.rs
@@ -1,23 +1,25 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`take_while`](super::StreamExt::take_while) method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct TakeWhile<St: Stream, Fut, F> {
- #[pin]
- stream: St,
- f: F,
- #[pin]
- pending_fut: Option<Fut>,
- pending_item: Option<St::Item>,
- done_taking: bool,
+pin_project! {
+ /// Stream for the [`take_while`](super::StreamExt::take_while) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TakeWhile<St: Stream, Fut, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending_fut: Option<Fut>,
+ pending_item: Option<St::Item>,
+ done_taking: bool,
+ }
}
impl<St, Fut, F> fmt::Debug for TakeWhile<St, Fut, F>
@@ -41,8 +43,8 @@ impl<St, Fut, F> TakeWhile<St, Fut, F>
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
- pub(super) fn new(stream: St, f: F) -> TakeWhile<St, Fut, F> {
- TakeWhile {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self {
stream,
f,
pending_fut: None,
diff --git a/src/stream/stream/then.rs b/src/stream/stream/then.rs
index 1334a6c..3d42bdd 100644
--- a/src/stream/stream/then.rs
+++ b/src/stream/stream/then.rs
@@ -1,21 +1,23 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`then`](super::StreamExt::then) method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct Then<St, Fut, F> {
- #[pin]
- stream: St,
- #[pin]
- future: Option<Fut>,
- f: F,
+pin_project! {
+ /// Stream for the [`then`](super::StreamExt::then) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Then<St, Fut, F> {
+ #[pin]
+ stream: St,
+ #[pin]
+ future: Option<Fut>,
+ f: F,
+ }
}
impl<St, Fut, F> fmt::Debug for Then<St, Fut, F>
@@ -35,8 +37,8 @@ impl<St, Fut, F> Then<St, Fut, F>
where St: Stream,
F: FnMut(St::Item) -> Fut,
{
- pub(super) fn new(stream: St, f: F) -> Then<St, Fut, F> {
- Then {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self {
stream,
future: None,
f,
diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs
new file mode 100644
index 0000000..5024770
--- /dev/null
+++ b/src/stream/stream/unzip.rs
@@ -0,0 +1,68 @@
+use core::mem;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`unzip`](super::StreamExt::unzip) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Unzip<St, FromA, FromB> {
+ #[pin]
+ stream: St,
+ left: FromA,
+ right: FromB,
+ }
+}
+
+impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> {
+ fn finish(self: Pin<&mut Self>) -> (FromA, FromB) {
+ let this = self.project();
+ (
+ mem::replace(this.left, Default::default()),
+ mem::replace(this.right, Default::default()),
+ )
+ }
+
+ pub(super) fn new(stream: St) -> Self {
+ Self {
+ stream,
+ left: Default::default(),
+ right: Default::default(),
+ }
+ }
+}
+
+impl<St, A, B, FromA, FromB> FusedFuture for Unzip<St, FromA, FromB>
+where St: FusedStream<Item = (A, B)>,
+ FromA: Default + Extend<A>,
+ FromB: Default + Extend<B>,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+impl<St, A, B, FromA, FromB> Future for Unzip<St, FromA, FromB>
+where St: Stream<Item = (A, B)>,
+ FromA: Default + Extend<A>,
+ FromB: Default + Extend<B>,
+{
+ type Output = (FromA, FromB);
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<(FromA, FromB)> {
+ let mut this = self.as_mut().project();
+ loop {
+ match ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(e) => {
+ this.left.extend(Some(e.0));
+ this.right.extend(Some(e.1));
+ },
+ None => return Poll::Ready(self.finish()),
+ }
+ }
+ }
+}
diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs
index 522f7e1..588531a 100644
--- a/src/stream/stream/zip.rs
+++ b/src/stream/stream/zip.rs
@@ -3,24 +3,25 @@ use core::cmp;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
-
-/// Stream for the [`zip`](super::StreamExt::zip) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Zip<St1: Stream, St2: Stream> {
- #[pin]
- stream1: Fuse<St1>,
- #[pin]
- stream2: Fuse<St2>,
- queued1: Option<St1::Item>,
- queued2: Option<St2::Item>,
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`zip`](super::StreamExt::zip) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Zip<St1: Stream, St2: Stream> {
+ #[pin]
+ stream1: Fuse<St1>,
+ #[pin]
+ stream2: Fuse<St2>,
+ queued1: Option<St1::Item>,
+ queued2: Option<St2::Item>,
+ }
}
impl<St1: Stream, St2: Stream> Zip<St1, St2> {
- pub(super) fn new(stream1: St1, stream2: St2) -> Zip<St1, St2> {
- Zip {
+ pub(super) fn new(stream1: St1, stream2: St2) -> Self {
+ Self {
stream1: stream1.fuse(),
stream2: stream2.fuse(),
queued1: None,
diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs
index 4d24c4f..b185646 100644
--- a/src/stream/try_stream/and_then.rs
+++ b/src/stream/try_stream/and_then.rs
@@ -1,21 +1,23 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
+use futures_core::ready;
use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`and_then`](super::TryStreamExt::and_then) method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct AndThen<St, Fut, F> {
- #[pin]
- stream: St,
- #[pin]
- future: Option<Fut>,
- f: F,
+pin_project! {
+ /// Stream for the [`and_then`](super::TryStreamExt::and_then) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct AndThen<St, Fut, F> {
+ #[pin]
+ stream: St,
+ #[pin]
+ future: Option<Fut>,
+ f: F,
+ }
}
impl<St, Fut, F> fmt::Debug for AndThen<St, Fut, F>
diff --git a/src/stream/try_stream/into_async_read.rs b/src/stream/try_stream/into_async_read.rs
index 10291b0..197c105 100644
--- a/src/stream/try_stream/into_async_read.rs
+++ b/src/stream/try_stream/into_async_read.rs
@@ -1,5 +1,6 @@
use crate::stream::TryStreamExt;
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncRead, AsyncWrite, AsyncBufRead};
@@ -39,7 +40,7 @@ where
St::Ok: AsRef<[u8]>,
{
pub(super) fn new(stream: St) -> Self {
- IntoAsyncRead {
+ Self {
stream,
state: ReadState::PendingChunk,
}
diff --git a/src/stream/try_stream/into_stream.rs b/src/stream/try_stream/into_stream.rs
index 370a327..89bc3ef 100644
--- a/src/stream/try_stream/into_stream.rs
+++ b/src/stream/try_stream/into_stream.rs
@@ -3,21 +3,22 @@ use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
-
-/// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct IntoStream<St> {
- #[pin]
- stream: St,
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct IntoStream<St> {
+ #[pin]
+ stream: St,
+ }
}
impl<St> IntoStream<St> {
#[inline]
pub(super) fn new(stream: St) -> Self {
- IntoStream { stream }
+ Self { stream }
}
delegate_access_inner!(stream, St, ());
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 0c8e108..6a48a4c 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -115,6 +115,12 @@ cfg_target_has_atomic! {
pub use self::try_buffer_unordered::TryBufferUnordered;
#[cfg(feature = "alloc")]
+ mod try_buffered;
+ #[cfg(feature = "alloc")]
+ #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+ pub use self::try_buffered::TryBuffered;
+
+ #[cfg(feature = "alloc")]
mod try_for_each_concurrent;
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
@@ -773,7 +779,7 @@ pub trait TryStreamExt: TryStream {
assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
}
- /// Attempt to execute several futures from a stream concurrently.
+ /// Attempt to execute several futures from a stream concurrently (unordered).
///
/// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
/// that matches the stream's `Error` type.
@@ -842,6 +848,80 @@ pub trait TryStreamExt: TryStream {
)
}
+ /// Attempt to execute several futures from a stream concurrently.
+ ///
+ /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
+ /// that matches the stream's `Error` type.
+ ///
+ /// This adaptor will buffer up to `n` futures and then return their
+ /// outputs in the order. If the underlying stream returns an error, it will
+ /// be immediately propagated.
+ ///
+ /// The returned stream will be a stream of results, each containing either
+ /// an error or a future's output. An error can be produced either by the
+ /// underlying stream itself or by one of the futures it yielded.
+ ///
+ /// This method is only available when the `std` or `alloc` feature of this
+ /// library is activated, and it is activated by default.
+ ///
+ /// # Examples
+ ///
+ /// Results are returned in the order of addition:
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::oneshot;
+ /// use futures::future::lazy;
+ /// use futures::stream::{self, StreamExt, TryStreamExt};
+ ///
+ /// let (send_one, recv_one) = oneshot::channel();
+ /// let (send_two, recv_two) = oneshot::channel();
+ ///
+ /// let mut buffered = lazy(move |cx| {
+ /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
+ ///
+ /// let mut buffered = stream_of_futures.try_buffered(10);
+ ///
+ /// assert!(buffered.try_poll_next_unpin(cx).is_pending());
+ ///
+ /// send_two.send(2i32)?;
+ /// assert!(buffered.try_poll_next_unpin(cx).is_pending());
+ /// Ok::<_, i32>(buffered)
+ /// }).await?;
+ ///
+ /// send_one.send(1i32)?;
+ /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
+ /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
+ ///
+ /// assert_eq!(buffered.next().await, None);
+ /// # Ok::<(), i32>(()) }).unwrap();
+ /// ```
+ ///
+ /// Errors from the underlying stream itself are propagated:
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::{StreamExt, TryStreamExt};
+ ///
+ /// let (sink, stream_of_futures) = mpsc::unbounded();
+ /// let mut buffered = stream_of_futures.try_buffered(10);
+ ///
+ /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
+ /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
+ ///
+ /// sink.unbounded_send(Err("error in the stream"))?;
+ /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
+ #[cfg(feature = "alloc")]
+ fn try_buffered(self, n: usize) -> TryBuffered<Self>
+ where
+ Self::Ok: TryFuture<Error = Self::Error>,
+ Self: Sized,
+ {
+ TryBuffered::new(self, n)
+ }
+
// TODO: false positive warning from rustdoc. Verify once #43466 settles
//
/// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs
index d972c0f..999123a 100644
--- a/src/stream/try_stream/or_else.rs
+++ b/src/stream/try_stream/or_else.rs
@@ -1,21 +1,23 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
+use futures_core::ready;
use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`or_else`](super::TryStreamExt::or_else) method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct OrElse<St, Fut, F> {
- #[pin]
- stream: St,
- #[pin]
- future: Option<Fut>,
- f: F,
+pin_project! {
+ /// Stream for the [`or_else`](super::TryStreamExt::or_else) method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct OrElse<St, Fut, F> {
+ #[pin]
+ stream: St,
+ #[pin]
+ future: Option<Fut>,
+ f: F,
+ }
}
impl<St, Fut, F> fmt::Debug for OrElse<St, Fut, F>
diff --git a/src/stream/try_stream/try_buffer_unordered.rs b/src/stream/try_stream/try_buffer_unordered.rs
index c1c931a..71c6fc7 100644
--- a/src/stream/try_stream/try_buffer_unordered.rs
+++ b/src/stream/try_stream/try_buffer_unordered.rs
@@ -5,21 +5,22 @@ use futures_core::stream::{Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
use core::pin::Pin;
-/// Stream for the
-/// [`try_buffer_unordered`](super::TryStreamExt::try_buffer_unordered) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct TryBufferUnordered<St>
- where St: TryStream
-{
- #[pin]
- stream: Fuse<IntoStream<St>>,
- in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>,
- max: usize,
+pin_project! {
+ /// Stream for the
+ /// [`try_buffer_unordered`](super::TryStreamExt::try_buffer_unordered) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryBufferUnordered<St>
+ where St: TryStream
+ {
+ #[pin]
+ stream: Fuse<IntoStream<St>>,
+ in_progress_queue: FuturesUnordered<IntoFuture<St::Ok>>,
+ max: usize,
+ }
}
impl<St> TryBufferUnordered<St>
@@ -27,7 +28,7 @@ impl<St> TryBufferUnordered<St>
St::Ok: TryFuture,
{
pub(super) fn new(stream: St, n: usize) -> Self {
- TryBufferUnordered {
+ Self {
stream: IntoStream::new(stream).fuse(),
in_progress_queue: FuturesUnordered::new(),
max: n,
diff --git a/src/stream/try_stream/try_buffered.rs b/src/stream/try_stream/try_buffered.rs
new file mode 100644
index 0000000..ff7e844
--- /dev/null
+++ b/src/stream/try_stream/try_buffered.rs
@@ -0,0 +1,90 @@
+use crate::stream::{Fuse, FuturesOrdered, StreamExt, IntoStream};
+use crate::future::{IntoFuture, TryFutureExt};
+use futures_core::future::TryFuture;
+use futures_core::stream::{Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+use core::pin::Pin;
+
+pin_project! {
+ /// Stream for the [`try_buffered`](super::TryStreamExt::try_buffered) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryBuffered<St>
+ where
+ St: TryStream,
+ St::Ok: TryFuture,
+ {
+ #[pin]
+ stream: Fuse<IntoStream<St>>,
+ in_progress_queue: FuturesOrdered<IntoFuture<St::Ok>>,
+ max: usize,
+ }
+}
+
+impl<St> TryBuffered<St>
+where
+ St: TryStream,
+ St::Ok: TryFuture,
+{
+ pub(super) fn new(stream: St, n: usize) -> Self {
+ Self {
+ stream: IntoStream::new(stream).fuse(),
+ in_progress_queue: FuturesOrdered::new(),
+ max: n,
+ }
+ }
+
+ delegate_access_inner!(stream, St, (. .));
+}
+
+impl<St> Stream for TryBuffered<St>
+where
+ St: TryStream,
+ St::Ok: TryFuture<Error = St::Error>,
+{
+ type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+
+ // First up, try to spawn off as many futures as possible by filling up
+ // our queue of futures. Propagate errors from the stream immediately.
+ while this.in_progress_queue.len() < *this.max {
+ match this.stream.as_mut().poll_next(cx)? {
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
+ Poll::Ready(None) | Poll::Pending => break,
+ }
+ }
+
+ // Attempt to pull the next value from the in_progress_queue
+ match this.in_progress_queue.poll_next_unpin(cx) {
+ x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
+ Poll::Ready(None) => {}
+ }
+
+ // If more values are still coming from the stream, we're not done yet
+ if this.stream.is_done() {
+ Poll::Ready(None)
+ } else {
+ Poll::Pending
+ }
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Item, E> Sink<Item> for TryBuffered<S>
+where
+ S: TryStream + Sink<Item, Error = E>,
+ S::Ok: TryFuture<Error = E>,
+{
+ type Error = E;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs
index 76ce619..387de97 100644
--- a/src/stream/try_stream/try_collect.rs
+++ b/src/stream/try_stream/try_collect.rs
@@ -1,23 +1,25 @@
use core::mem;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
use futures_core::stream::{FusedStream, TryStream};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`try_collect`](super::TryStreamExt::try_collect) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct TryCollect<St, C> {
- #[pin]
- stream: St,
- items: C,
+pin_project! {
+ /// Future for the [`try_collect`](super::TryStreamExt::try_collect) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryCollect<St, C> {
+ #[pin]
+ stream: St,
+ items: C,
+ }
}
impl<St: TryStream, C: Default> TryCollect<St, C> {
- pub(super) fn new(s: St) -> TryCollect<St, C> {
- TryCollect {
+ pub(super) fn new(s: St) -> Self {
+ Self {
stream: s,
items: Default::default(),
}
diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs
index 235a2c5..2451332 100644
--- a/src/stream/try_stream/try_concat.rs
+++ b/src/stream/try_stream/try_concat.rs
@@ -1,17 +1,19 @@
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`try_concat`](super::TryStreamExt::try_concat) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct TryConcat<St: TryStream> {
- #[pin]
- stream: St,
- accum: Option<St::Ok>,
+pin_project! {
+ /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryConcat<St: TryStream> {
+ #[pin]
+ stream: St,
+ accum: Option<St::Ok>,
+ }
}
impl<St> TryConcat<St>
@@ -19,8 +21,8 @@ where
St: TryStream,
St::Ok: Extend<<St::Ok as IntoIterator>::Item> + IntoIterator + Default,
{
- pub(super) fn new(stream: St) -> TryConcat<St> {
- TryConcat {
+ pub(super) fn new(stream: St) -> Self {
+ Self {
stream,
accum: None,
}
diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs
index 38d060c..eacefd2 100644
--- a/src/stream/try_stream/try_filter.rs
+++ b/src/stream/try_stream/try_filter.rs
@@ -1,25 +1,27 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`try_filter`](super::TryStreamExt::try_filter)
-/// method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct TryFilter<St, Fut, F>
- where St: TryStream
-{
- #[pin]
- stream: St,
- f: F,
- #[pin]
- pending_fut: Option<Fut>,
- pending_item: Option<St::Ok>,
+pin_project! {
+ /// Stream for the [`try_filter`](super::TryStreamExt::try_filter)
+ /// method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryFilter<St, Fut, F>
+ where St: TryStream
+ {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending_fut: Option<Fut>,
+ pending_item: Option<St::Ok>,
+ }
}
impl<St, Fut, F> fmt::Debug for TryFilter<St, Fut, F>
@@ -41,7 +43,7 @@ impl<St, Fut, F> TryFilter<St, Fut, F>
where St: TryStream
{
pub(super) fn new(stream: St, f: F) -> Self {
- TryFilter {
+ Self {
stream,
f,
pending_fut: None,
diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs
index 6c1e48f..335649b 100644
--- a/src/stream/try_stream/try_filter_map.rs
+++ b/src/stream/try_stream/try_filter_map.rs
@@ -1,22 +1,24 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::{TryFuture};
+use futures_core::ready;
use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map)
-/// method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct TryFilterMap<St, Fut, F> {
- #[pin]
- stream: St,
- f: F,
- #[pin]
- pending: Option<Fut>,
+pin_project! {
+ /// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map)
+ /// method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryFilterMap<St, Fut, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending: Option<Fut>,
+ }
}
impl<St, Fut, F> fmt::Debug for TryFilterMap<St, Fut, F>
@@ -34,7 +36,7 @@ where
impl<St, Fut, F> TryFilterMap<St, Fut, F> {
pub(super) fn new(stream: St, f: F) -> Self {
- TryFilterMap { stream, f, pending: None }
+ Self { stream, f, pending: None }
}
delegate_access_inner!(stream, St, ());
@@ -66,8 +68,9 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F>
Poll::Ready(loop {
if let Some(p) = this.pending.as_mut().as_pin_mut() {
// We have an item in progress, poll that until it's done
- let item = ready!(p.try_poll(cx)?);
+ let res = ready!(p.try_poll(cx));
this.pending.set(None);
+ let item = res?;
if item.is_some() {
break item.map(Ok);
}
diff --git a/src/stream/try_stream/try_flatten.rs b/src/stream/try_stream/try_flatten.rs
index 5227903..4fc04a0 100644
--- a/src/stream/try_stream/try_flatten.rs
+++ b/src/stream/try_stream/try_flatten.rs
@@ -1,22 +1,24 @@
use core::pin::Pin;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method.
-#[pin_project]
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct TryFlatten<St>
-where
- St: TryStream,
-{
- #[pin]
- stream: St,
- #[pin]
- next: Option<St::Ok>,
+pin_project! {
+ /// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryFlatten<St>
+ where
+ St: TryStream,
+ {
+ #[pin]
+ stream: St,
+ #[pin]
+ next: Option<St::Ok>,
+ }
}
impl<St> TryFlatten<St>
diff --git a/src/stream/try_stream/try_fold.rs b/src/stream/try_stream/try_fold.rs
index abeeea4..1d41e4b 100644
--- a/src/stream/try_stream/try_fold.rs
+++ b/src/stream/try_stream/try_fold.rs
@@ -1,20 +1,22 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future, TryFuture};
+use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`try_fold`](super::TryStreamExt::try_fold) method.
-#[pin_project]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct TryFold<St, Fut, T, F> {
- #[pin]
- stream: St,
- f: F,
- accum: Option<T>,
- #[pin]
- future: Option<Fut>,
+pin_project! {
+ /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryFold<St, Fut, T, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ accum: Option<T>,
+ #[pin]
+ future: Option<Fut>,
+ }
}
impl<St, Fut, T, F> fmt::Debug for TryFold<St, Fut, T, F>
@@ -37,8 +39,8 @@ where St: TryStream,
F: FnMut(T, St::Ok) -> Fut,
Fut: TryFuture<Ok = T, Error = St::Error>,
{
- pub(super) fn new(stream: St, f: F, t: T) -> TryFold<St, Fut, T, F> {
- TryFold {
+ pub(super) fn new(stream: St, f: F, t: T) -> Self {
+ Self {
stream,
f,
accum: Some(t),
diff --git a/src/stream/try_stream/try_for_each.rs b/src/stream/try_stream/try_for_each.rs
index a00e911..0a814ae 100644
--- a/src/stream/try_stream/try_for_each.rs
+++ b/src/stream/try_stream/try_for_each.rs
@@ -1,19 +1,21 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
+use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method.
-#[pin_project]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct TryForEach<St, Fut, F> {
- #[pin]
- stream: St,
- f: F,
- #[pin]
- future: Option<Fut>,
+pin_project! {
+ /// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryForEach<St, Fut, F> {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ future: Option<Fut>,
+ }
}
impl<St, Fut, F> fmt::Debug for TryForEach<St, Fut, F>
@@ -34,8 +36,8 @@ where St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = St::Error>,
{
- pub(super) fn new(stream: St, f: F) -> TryForEach<St, Fut, F> {
- TryForEach {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self {
stream,
f,
future: None,
diff --git a/src/stream/try_stream/try_for_each_concurrent.rs b/src/stream/try_stream/try_for_each_concurrent.rs
index db8e505..d2f4b0f 100644
--- a/src/stream/try_stream/try_for_each_concurrent.rs
+++ b/src/stream/try_stream/try_for_each_concurrent.rs
@@ -6,19 +6,20 @@ use core::num::NonZeroUsize;
use futures_core::future::{FusedFuture, Future};
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Future for the
-/// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent)
-/// method.
-#[pin_project]
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-pub struct TryForEachConcurrent<St, Fut, F> {
- #[pin]
- stream: Option<St>,
- f: F,
- futures: FuturesUnordered<Fut>,
- limit: Option<NonZeroUsize>,
+pin_project! {
+ /// Future for the
+ /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent)
+ /// method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct TryForEachConcurrent<St, Fut, F> {
+ #[pin]
+ stream: Option<St>,
+ f: F,
+ futures: FuturesUnordered<Fut>,
+ limit: Option<NonZeroUsize>,
+ }
}
impl<St, Fut, F> fmt::Debug for TryForEachConcurrent<St, Fut, F>
@@ -50,8 +51,8 @@ where St: TryStream,
F: FnMut(St::Ok) -> Fut,
Fut: Future<Output = Result<(), St::Error>>,
{
- pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> TryForEachConcurrent<St, Fut, F> {
- TryForEachConcurrent {
+ pub(super) fn new(stream: St, limit: Option<usize>, f: F) -> Self {
+ Self {
stream: Some(stream),
// Note: `limit` = 0 gets ignored.
limit: limit.and_then(NonZeroUsize::new),
diff --git a/src/stream/try_stream/try_next.rs b/src/stream/try_stream/try_next.rs
index 78599ad..1bc00fb 100644
--- a/src/stream/try_stream/try_next.rs
+++ b/src/stream/try_stream/try_next.rs
@@ -15,7 +15,7 @@ impl<St: ?Sized + Unpin> Unpin for TryNext<'_, St> {}
impl<'a, St: ?Sized + TryStream + Unpin> TryNext<'a, St> {
pub(super) fn new(stream: &'a mut St) -> Self {
- TryNext { stream }
+ Self { stream }
}
}
diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs
index 35759d0..0603b10 100644
--- a/src/stream/try_stream/try_skip_while.rs
+++ b/src/stream/try_stream/try_skip_while.rs
@@ -1,24 +1,26 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
+use futures_core::ready;
use futures_core::stream::{Stream, TryStream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while)
-/// method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct TrySkipWhile<St, Fut, F> where St: TryStream {
- #[pin]
- stream: St,
- f: F,
- #[pin]
- pending_fut: Option<Fut>,
- pending_item: Option<St::Ok>,
- done_skipping: bool,
+pin_project! {
+ /// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while)
+ /// method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TrySkipWhile<St, Fut, F> where St: TryStream {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending_fut: Option<Fut>,
+ pending_item: Option<St::Ok>,
+ done_skipping: bool,
+ }
}
impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F>
@@ -42,8 +44,8 @@ impl<St, Fut, F> TrySkipWhile<St, Fut, F>
F: FnMut(&St::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = St::Error>,
{
- pub(super) fn new(stream: St, f: F) -> TrySkipWhile<St, Fut, F> {
- TrySkipWhile {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self {
stream,
f,
pending_fut: None,
@@ -74,9 +76,10 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
Poll::Ready(loop {
if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
- let skipped = ready!(fut.try_poll(cx)?);
- let item = this.pending_item.take();
+ let res = ready!(fut.try_poll(cx));
this.pending_fut.set(None);
+ let skipped = res?;
+ let item = this.pending_item.take();
if !skipped {
*this.done_skipping = true;
break item.map(Ok);
diff --git a/src/stream/try_stream/try_take_while.rs b/src/stream/try_stream/try_take_while.rs
index 16bfb20..6241572 100644
--- a/src/stream/try_stream/try_take_while.rs
+++ b/src/stream/try_stream/try_take_while.rs
@@ -1,27 +1,29 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
-/// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while)
-/// method.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct TryTakeWhile<St, Fut, F>
-where
- St: TryStream,
-{
- #[pin]
- stream: St,
- f: F,
- #[pin]
- pending_fut: Option<Fut>,
- pending_item: Option<St::Ok>,
- done_taking: bool,
+pin_project! {
+ /// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while)
+ /// method.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryTakeWhile<St, Fut, F>
+ where
+ St: TryStream,
+ {
+ #[pin]
+ stream: St,
+ f: F,
+ #[pin]
+ pending_fut: Option<Fut>,
+ pending_item: Option<St::Ok>,
+ done_taking: bool,
+ }
}
impl<St, Fut, F> fmt::Debug for TryTakeWhile<St, Fut, F>
@@ -46,8 +48,8 @@ where
F: FnMut(&St::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = St::Error>,
{
- pub(super) fn new(stream: St, f: F) -> TryTakeWhile<St, Fut, F> {
- TryTakeWhile {
+ pub(super) fn new(stream: St, f: F) -> Self {
+ Self {
stream,
f,
pending_fut: None,
@@ -76,9 +78,10 @@ where
Poll::Ready(loop {
if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
- let take = ready!(fut.try_poll(cx)?);
- let item = this.pending_item.take();
+ let res = ready!(fut.try_poll(cx));
this.pending_fut.set(None);
+ let take = res?;
+ let item = this.pending_item.take();
if take {
break item.map(Ok);
} else {
diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs
index a6b31fe..c8fc421 100644
--- a/src/stream/try_stream/try_unfold.rs
+++ b/src/stream/try_stream/try_unfold.rs
@@ -1,9 +1,10 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::TryFuture;
+use futures_core::ready;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
/// Creates a `TryStream` from a seed and a closure returning a `TryFuture`.
///
@@ -66,14 +67,15 @@ where
}
}
-/// Stream for the [`try_unfold`] function.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct TryUnfold<T, F, Fut> {
- f: F,
- state: Option<T>,
- #[pin]
- fut: Option<Fut>,
+pin_project! {
+ /// Stream for the [`try_unfold`] function.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct TryUnfold<T, F, Fut> {
+ f: F,
+ state: Option<T>,
+ #[pin]
+ fut: Option<Fut>,
+ }
}
impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs
index b6f8eae..473bb67 100644
--- a/src/stream/unfold.rs
+++ b/src/stream/unfold.rs
@@ -1,9 +1,11 @@
+use crate::unfold_state::UnfoldState;
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
+use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project::pin_project;
+use pin_project_lite::pin_project;
/// Creates a `Stream` from a seed and a closure returning a `Future`.
///
@@ -45,24 +47,24 @@ use pin_project::pin_project;
/// # });
/// ```
pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut>
- where F: FnMut(T) -> Fut,
- Fut: Future<Output = Option<(Item, T)>>,
+where
+ F: FnMut(T) -> Fut,
+ Fut: Future<Output = Option<(Item, T)>>,
{
Unfold {
f,
- state: Some(init),
- fut: None,
+ state: UnfoldState::Value { value: init },
}
}
-/// Stream for the [`unfold`] function.
-#[pin_project]
-#[must_use = "streams do nothing unless polled"]
-pub struct Unfold<T, F, Fut> {
- f: F,
- state: Option<T>,
- #[pin]
- fut: Option<Fut>,
+pin_project! {
+ /// Stream for the [`unfold`] function.
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Unfold<T, F, Fut> {
+ f: F,
+ #[pin]
+ state: UnfoldState<T, Fut>,
+ }
}
impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
@@ -73,44 +75,50 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Unfold")
.field("state", &self.state)
- .field("fut", &self.fut)
.finish()
}
}
impl<T, F, Fut, Item> FusedStream for Unfold<T, F, Fut>
- where F: FnMut(T) -> Fut,
- Fut: Future<Output = Option<(Item, T)>>,
+where
+ F: FnMut(T) -> Fut,
+ Fut: Future<Output = Option<(Item, T)>>,
{
fn is_terminated(&self) -> bool {
- self.state.is_none() && self.fut.is_none()
+ if let UnfoldState::Empty = self.state {
+ true
+ } else {
+ false
+ }
}
}
impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
- where F: FnMut(T) -> Fut,
- Fut: Future<Output = Option<(Item, T)>>,
+where
+ F: FnMut(T) -> Fut,
+ Fut: Future<Output = Option<(Item, T)>>,
{
type Item = Item;
- fn poll_next(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
- if let Some(state) = this.state.take() {
- this.fut.set(Some((this.f)(state)));
+ if let Some(state) = this.state.as_mut().take_value() {
+ this.state.set(UnfoldState::Future {
+ future: (this.f)(state),
+ });
}
- let step = ready!(this.fut.as_mut().as_pin_mut()
- .expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx));
- this.fut.set(None);
+ let step = match this.state.as_mut().project_future() {
+ Some(fut) => ready!(fut.poll(cx)),
+ None => panic!("Unfold must not be polled after it returned `Poll::Ready(None)`"),
+ };
if let Some((item, next_state)) = step {
- *this.state = Some(next_state);
+ this.state.set(UnfoldState::Value { value: next_state });
Poll::Ready(Some(item))
} else {
+ this.state.set(UnfoldState::Empty);
Poll::Ready(None)
}
}
diff --git a/src/task/mod.rs b/src/task/mod.rs
index fb3b7ad..77e5a96 100644
--- a/src/task/mod.rs
+++ b/src/task/mod.rs
@@ -1,4 +1,25 @@
-//! Task notification
+//! Tools for working with tasks.
+//!
+//! This module contains:
+//!
+//! - [`Spawn`], a trait for spawning new tasks.
+//! - [`Context`], a context of an asynchronous task,
+//! including a handle for waking up the task.
+//! - [`Waker`], a handle for waking up a task.
+//!
+//! The remaining types and traits in the module are used for implementing
+//! executors or dealing with synchronization issues around task wakeup.
+
+pub use futures_core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable};
+
+pub use futures_task::{
+ Spawn, LocalSpawn, SpawnError,
+ FutureObj, LocalFutureObj, UnsafeFutureObj,
+};
+
+pub use futures_task::noop_waker;
+#[cfg(feature = "std")]
+pub use futures_task::noop_waker_ref;
cfg_target_has_atomic! {
#[cfg(feature = "alloc")]
@@ -15,14 +36,3 @@ cfg_target_has_atomic! {
mod spawn;
pub use self::spawn::{SpawnExt, LocalSpawnExt};
-
-pub use futures_core::task::{Context, Poll, Waker, RawWaker, RawWakerVTable};
-
-pub use futures_task::{
- Spawn, LocalSpawn, SpawnError,
- FutureObj, LocalFutureObj, UnsafeFutureObj,
-};
-
-pub use futures_task::noop_waker;
-#[cfg(feature = "std")]
-pub use futures_task::noop_waker_ref;
diff --git a/src/unfold_state.rs b/src/unfold_state.rs
new file mode 100644
index 0000000..0edc15e
--- /dev/null
+++ b/src/unfold_state.rs
@@ -0,0 +1,39 @@
+use core::pin::Pin;
+
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// UnfoldState used for stream and sink unfolds
+ #[project = UnfoldStateProj]
+ #[project_replace = UnfoldStateProjReplace]
+ #[derive(Debug)]
+ pub(crate) enum UnfoldState<T, R> {
+ Value {
+ value: T,
+ },
+ Future {
+ #[pin]
+ future: R,
+ },
+ Empty,
+ }
+}
+
+impl<T, R> UnfoldState<T, R> {
+ pub(crate) fn project_future(self: Pin<&mut Self>) -> Option<Pin<&mut R>> {
+ match self.project() {
+ UnfoldStateProj::Future { future } => Some(future),
+ _ => None,
+ }
+ }
+
+ pub(crate) fn take_value(self: Pin<&mut Self>) -> Option<T> {
+ match &*self {
+ UnfoldState::Value { .. } => match self.project_replace(UnfoldState::Empty) {
+ UnfoldStateProjReplace::Value { value } => Some(value),
+ _ => unreachable!(),
+ },
+ _ => None,
+ }
+ }
+}