aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2020-05-18 19:28:07 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2020-05-18 19:28:07 +0000
commit98d4e50b1a70732a4a618c95531863afc46e6c0b (patch)
tree7f4137b4bc72b97c0b12238d9a186963e667f8ac
parent2e1b1c22e9a677755b9678bf7902ff20cd03253f (diff)
parent40ca3c3c763fca1a9508fd9dafc4976b07d88f69 (diff)
downloadfutures-98d4e50b1a70732a4a618c95531863afc46e6c0b.tar.gz
Upgrade rust/crates/futures to 0.3.5 am: 67f2b76b07 am: 00555d39b3 am: 60f4d8afaf am: 40ca3c3c76
Change-Id: I035e7577affac5b3cb43f71f82f3077bdad131e7
-rw-r--r--Cargo.toml34
-rw-r--r--Cargo.toml.orig26
-rw-r--r--METADATA13
-rw-r--r--src/lib.rs83
-rw-r--r--tests/abortable.rs21
-rw-r--r--tests/arc_wake.rs107
-rw-r--r--tests/async_await_macros.rs116
-rw-r--r--tests/atomic_waker.rs19
-rw-r--r--tests/buffer_unordered.rs15
-rw-r--r--tests/eager_drop.rs169
-rw-r--r--tests/eventual.rs1
-rw-r--r--tests/future_try_flatten_stream.rs98
-rw-r--r--tests/futures_ordered.rs31
-rw-r--r--tests/futures_unordered.rs78
-rw-r--r--tests/inspect.rs7
-rw-r--r--tests/io_buf_reader.rs287
-rw-r--r--tests/io_buf_writer.rs222
-rw-r--r--tests/io_cursor.rs20
-rw-r--r--tests/io_lines.rs67
-rw-r--r--tests/io_read.rs58
-rw-r--r--tests/io_read_exact.rs7
-rw-r--r--tests/io_read_line.rs39
-rw-r--r--tests/io_read_to_string.rs39
-rw-r--r--tests/io_read_until.rs40
-rw-r--r--tests/io_window.rs1
-rw-r--r--tests/io_write.rs70
-rw-r--r--tests/join_all.rs44
-rw-r--r--tests/macro_comma_support.rs37
-rw-r--r--tests/mutex.rs31
-rw-r--r--tests/object_safety.rs1
-rw-r--r--tests/oneshot.rs43
-rw-r--r--tests/ready_queue.rs54
-rw-r--r--tests/recurse.rs11
-rw-r--r--tests/select_all.rs9
-rw-r--r--tests/select_ok.rs11
-rw-r--r--tests/shared.rs107
-rw-r--r--tests/sink.rs594
-rw-r--r--tests/sink_fanout.rs13
-rw-r--r--tests/split.rs101
-rw-r--r--tests/stream.rs145
-rw-r--r--tests/stream_catch_unwind.rs11
-rw-r--r--tests/stream_into_async_read.rs98
-rw-r--r--tests/stream_peekable.rs9
-rw-r--r--tests/stream_select_all.rs23
-rw-r--r--tests/stream_select_next_some.rs27
-rw-r--r--tests/try_join.rs2
-rw-r--r--tests/try_join_all.rs42
47 files changed, 1977 insertions, 1104 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 3f93105..290b3cc 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
[package]
edition = "2018"
name = "futures"
-version = "0.3.4"
+version = "0.3.5"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n"
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3.0"
+documentation = "https://docs.rs/futures/0.3.5"
readme = "../README.md"
keywords = ["futures", "async", "future"]
categories = ["asynchronous"]
@@ -29,35 +29,50 @@ all-features = true
[package.metadata.playground]
features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"]
[dependencies.futures-channel]
-version = "0.3.4"
+version = "0.3.5"
features = ["sink"]
default-features = false
[dependencies.futures-core]
-version = "0.3.4"
+version = "0.3.5"
default-features = false
[dependencies.futures-executor]
-version = "0.3.4"
+version = "0.3.5"
optional = true
default-features = false
[dependencies.futures-io]
-version = "0.3.4"
+version = "0.3.5"
default-features = false
[dependencies.futures-sink]
-version = "0.3.4"
+version = "0.3.5"
default-features = false
[dependencies.futures-task]
-version = "0.3.4"
+version = "0.3.5"
default-features = false
[dependencies.futures-util]
-version = "0.3.4"
+version = "0.3.5"
features = ["sink"]
default-features = false
+[dev-dependencies.assert_matches]
+version = "1.3.0"
+
+[dev-dependencies.futures-executor]
+version = "0.3.5"
+features = ["thread-pool"]
+
+[dev-dependencies.futures-test]
+version = "0.3.5"
+
+[dev-dependencies.pin-utils]
+version = "0.1.0"
+
+[dev-dependencies.tokio]
+version = "0.1.11"
[features]
alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"]
@@ -72,5 +87,6 @@ read-initializer = ["futures-io/read-initializer", "futures-util/read-initialize
std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"]
thread-pool = ["executor", "futures-executor/thread-pool"]
unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
+write-all-vectored = ["futures-util/write-all-vectored"]
[badges.travis-ci]
repository = "rust-lang/futures-rs"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 7095c19..c7288ed 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,14 +1,14 @@
[package]
name = "futures"
edition = "2018"
-version = "0.3.4"
+version = "0.3.5"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT OR Apache-2.0"
readme = "../README.md"
keywords = ["futures", "async", "future"]
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3.0"
+documentation = "https://docs.rs/futures/0.3.5"
description = """
An implementation of futures and streams featuring zero allocations,
composability, and iterator-like interfaces.
@@ -19,13 +19,20 @@ categories = ["asynchronous"]
travis-ci = { repository = "rust-lang/futures-rs" }
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.4", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.4", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.4", default-features = false, features = ["sink"] }
-futures-executor = { path = "../futures-executor", version = "0.3.4", default-features = false, optional = true }
-futures-io = { path = "../futures-io", version = "0.3.4", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.4", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.4", default-features = false, features = ["sink"] }
+futures-core = { path = "../futures-core", version = "0.3.5", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.5", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.5", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.5", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.5", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.5", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.5", default-features = false, features = ["sink"] }
+
+[dev-dependencies]
+pin-utils = "0.1.0"
+futures-executor = { path = "../futures-executor", version = "0.3.5", features = ["thread-pool"] }
+futures-test = { path = "../futures-test", version = "0.3.5" }
+tokio = "0.1.11"
+assert_matches = "1.3.0"
[features]
default = ["std", "async-await", "executor"]
@@ -44,6 +51,7 @@ unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/u
cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"]
bilock = ["futures-util/bilock"]
read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
+write-all-vectored = ["futures-util/write-all-vectored"]
[package.metadata.docs.rs]
all-features = true
diff --git a/METADATA b/METADATA
index 42d8dba..06ea8d4 100644
--- a/METADATA
+++ b/METADATA
@@ -1,8 +1,5 @@
name: "futures"
-description:
- "An implementation of futures and streams featuring zero allocations, "
- "composability, and iterator-like interfaces."
-
+description: "An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces."
third_party {
url {
type: HOMEPAGE
@@ -12,7 +9,11 @@ third_party {
type: GIT
value: "https://github.com/rust-lang/futures-rs"
}
- version: "0.3.4"
- last_upgrade_date { year: 2020 month: 3 day: 17 }
+ version: "0.3.5"
license_type: NOTICE
+ last_upgrade_date {
+ year: 2020
+ month: 5
+ day: 8
+ }
}
diff --git a/src/lib.rs b/src/lib.rs
index 3cdb3d3..e6380d2 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -90,7 +90,7 @@
#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
-#![doc(html_root_url = "https://docs.rs/futures/0.3.0")]
+#![doc(html_root_url = "https://docs.rs/futures/0.3.5")]
#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
@@ -118,9 +118,11 @@ compile_error!("The `read-initializer` feature requires the `unstable` feature a
// Macro reexports
pub use futures_core::ready; // Readiness propagation
pub use futures_util::pin_mut;
+#[cfg(feature = "async-await")]
+pub use futures_util::{pending, poll, join, try_join, select_biased}; // Async-await
#[cfg(feature = "std")]
#[cfg(feature = "async-await")]
-pub use futures_util::{pending, poll}; // Async-await
+pub use futures_util::select;
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
@@ -332,7 +334,7 @@ pub mod io {
BufReader, BufWriter, Cursor, Chain, Close, copy, Copy, copy_buf, CopyBuf,
empty, Empty, Flush, IntoSink, Lines, Read, ReadExact, ReadHalf,
ReadLine, ReadToEnd, ReadToString, ReadUntil, ReadVectored, repeat,
- Repeat, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf,
+ Repeat, ReuniteError, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf,
WriteVectored,
};
}
@@ -442,8 +444,8 @@ pub mod stream {
try_unfold, TryUnfold,
StreamExt,
- Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold,
- Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next,
+ Chain, Collect, Concat, Enumerate, Filter, FilterMap, FlatMap, Flatten,
+ Fold, Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next,
SelectNextSome, Peek, Peekable, Scan, Skip, SkipWhile, Take, TakeWhile,
Then, Zip,
@@ -458,7 +460,7 @@ pub mod stream {
#[cfg(feature = "alloc")]
pub use futures_util::stream::{
// For StreamExt:
- Chunks,
+ Chunks, ReadyChunks,
};
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
@@ -534,72 +536,3 @@ pub mod never {
pub use futures_util::never::Never;
}
-
-// proc-macro re-export --------------------------------------
-
-// Not public API.
-#[doc(hidden)]
-pub use futures_core::core_reexport;
-
-// Not public API.
-#[cfg(feature = "async-await")]
-#[doc(hidden)]
-pub use futures_util::async_await;
-
-// Not public API.
-#[cfg(feature = "async-await")]
-#[doc(hidden)]
-pub mod inner_macro {
- pub use futures_util::join;
- pub use futures_util::try_join;
- #[cfg(feature = "std")]
- pub use futures_util::select;
- pub use futures_util::select_biased;
-}
-
-#[cfg(feature = "async-await")]
-futures_util::document_join_macro! {
- #[macro_export]
- macro_rules! join { // replace `::futures_util` with `::futures` as the crate path
- ($($tokens:tt)*) => {
- $crate::inner_macro::join! {
- futures_crate_path ( ::futures )
- $( $tokens )*
- }
- }
- }
-
- #[macro_export]
- macro_rules! try_join { // replace `::futures_util` with `::futures` as the crate path
- ($($tokens:tt)*) => {
- $crate::inner_macro::try_join! {
- futures_crate_path ( ::futures )
- $( $tokens )*
- }
- }
- }
-}
-
-#[cfg(feature = "async-await")]
-futures_util::document_select_macro! {
- #[cfg(feature = "std")]
- #[macro_export]
- macro_rules! select { // replace `::futures_util` with `::futures` as the crate path
- ($($tokens:tt)*) => {
- $crate::inner_macro::select! {
- futures_crate_path ( ::futures )
- $( $tokens )*
- }
- }
- }
-
- #[macro_export]
- macro_rules! select_biased { // replace `::futures_util` with `::futures` as the crate path
- ($($tokens:tt)*) => {
- $crate::inner_macro::select_biased! {
- futures_crate_path ( ::futures )
- $( $tokens )*
- }
- }
- }
-}
diff --git a/tests/abortable.rs b/tests/abortable.rs
index 5925c9a..fcbabe9 100644
--- a/tests/abortable.rs
+++ b/tests/abortable.rs
@@ -1,11 +1,10 @@
-use futures::channel::oneshot;
-use futures::executor::block_on;
-use futures::future::{abortable, Aborted, FutureExt};
-use futures::task::{Context, Poll};
-use futures_test::task::new_count_waker;
-
+#[cfg(all(feature = "alloc", feature = "executor"))]
#[test]
fn abortable_works() {
+ use futures::channel::oneshot;
+ use futures::future::{abortable, Aborted};
+ use futures::executor::block_on;
+
let (_tx, a_rx) = oneshot::channel::<()>();
let (abortable_rx, abort_handle) = abortable(a_rx);
@@ -13,8 +12,14 @@ fn abortable_works() {
assert_eq!(Err(Aborted), block_on(abortable_rx));
}
+#[cfg(all(feature = "alloc", feature = "executor"))]
#[test]
fn abortable_awakens() {
+ use futures::channel::oneshot;
+ use futures::future::{abortable, Aborted, FutureExt};
+ use futures::task::{Context, Poll};
+ use futures_test::task::new_count_waker;
+
let (_tx, a_rx) = oneshot::channel::<()>();
let (mut abortable_rx, abort_handle) = abortable(a_rx);
@@ -28,8 +33,12 @@ fn abortable_awakens() {
assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx));
}
+#[cfg(all(feature = "alloc", feature = "executor"))]
#[test]
fn abortable_resolves() {
+ use futures::channel::oneshot;
+ use futures::future::abortable;
+ use futures::executor::block_on;
let (tx, a_rx) = oneshot::channel::<()>();
let (abortable_rx, _abort_handle) = abortable(a_rx);
diff --git a/tests/arc_wake.rs b/tests/arc_wake.rs
index 1940e4f..38217f0 100644
--- a/tests/arc_wake.rs
+++ b/tests/arc_wake.rs
@@ -1,60 +1,79 @@
-use futures::task::{self, ArcWake, Waker};
-use std::sync::{Arc, Mutex};
+#[cfg(feature = "alloc")]
+mod countingwaker {
+ use futures::task::{self, ArcWake, Waker};
+ use std::sync::{Arc, Mutex};
-struct CountingWaker {
- nr_wake: Mutex<i32>,
-}
+ struct CountingWaker {
+ nr_wake: Mutex<i32>,
+ }
-impl CountingWaker {
- fn new() -> CountingWaker {
- CountingWaker {
- nr_wake: Mutex::new(0),
+ impl CountingWaker {
+ fn new() -> CountingWaker {
+ CountingWaker {
+ nr_wake: Mutex::new(0),
+ }
}
- }
- fn wakes(&self) -> i32 {
- *self.nr_wake.lock().unwrap()
+ fn wakes(&self) -> i32 {
+ *self.nr_wake.lock().unwrap()
+ }
}
-}
-impl ArcWake for CountingWaker {
- fn wake_by_ref(arc_self: &Arc<Self>) {
- let mut lock = arc_self.nr_wake.lock().unwrap();
- *lock += 1;
+ impl ArcWake for CountingWaker {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ let mut lock = arc_self.nr_wake.lock().unwrap();
+ *lock += 1;
+ }
}
-}
-#[test]
-fn create_waker_from_arc() {
- let some_w = Arc::new(CountingWaker::new());
+ #[test]
+ fn create_from_arc() {
+ let some_w = Arc::new(CountingWaker::new());
- let w1: Waker = task::waker(some_w.clone());
- assert_eq!(2, Arc::strong_count(&some_w));
- w1.wake_by_ref();
- assert_eq!(1, some_w.wakes());
+ let w1: Waker = task::waker(some_w.clone());
+ assert_eq!(2, Arc::strong_count(&some_w));
+ w1.wake_by_ref();
+ assert_eq!(1, some_w.wakes());
- let w2 = w1.clone();
- assert_eq!(3, Arc::strong_count(&some_w));
+ let w2 = w1.clone();
+ assert_eq!(3, Arc::strong_count(&some_w));
- w2.wake_by_ref();
- assert_eq!(2, some_w.wakes());
+ w2.wake_by_ref();
+ assert_eq!(2, some_w.wakes());
- drop(w2);
- assert_eq!(2, Arc::strong_count(&some_w));
- drop(w1);
- assert_eq!(1, Arc::strong_count(&some_w));
-}
+ drop(w2);
+ assert_eq!(2, Arc::strong_count(&some_w));
+ drop(w1);
+ assert_eq!(1, Arc::strong_count(&some_w));
+ }
-struct PanicWaker;
+ #[test]
+ fn ref_wake_same() {
+ let some_w = Arc::new(CountingWaker::new());
-impl ArcWake for PanicWaker {
- fn wake_by_ref(_arc_self: &Arc<Self>) {
- panic!("WAKE UP");
+ let w1: Waker = task::waker(some_w.clone());
+ let w2 = task::waker_ref(&some_w);
+ let w3 = w2.clone();
+
+ assert!(w1.will_wake(&w2));
+ assert!(w2.will_wake(&w3));
}
}
+#[cfg(feature = "alloc")]
#[test]
fn proper_refcount_on_wake_panic() {
+ use futures::task::{self, ArcWake, Waker};
+ use std::sync::Arc;
+
+ struct PanicWaker;
+
+ impl ArcWake for PanicWaker {
+ fn wake_by_ref(_arc_self: &Arc<Self>) {
+ panic!("WAKE UP");
+ }
+ }
+
let some_w = Arc::new(PanicWaker);
let w1: Waker = task::waker(some_w.clone());
@@ -63,15 +82,3 @@ fn proper_refcount_on_wake_panic() {
drop(w1);
assert_eq!(1, Arc::strong_count(&some_w)); // some_w
}
-
-#[test]
-fn waker_ref_wake_same() {
- let some_w = Arc::new(CountingWaker::new());
-
- let w1: Waker = task::waker(some_w.clone());
- let w2 = task::waker_ref(&some_w);
- let w3 = w2.clone();
-
- assert!(w1.will_wake(&w2));
- assert!(w2.will_wake(&w3));
-}
diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs
index bc717df..fd2a349 100644
--- a/tests/async_await_macros.rs
+++ b/tests/async_await_macros.rs
@@ -1,15 +1,12 @@
#![recursion_limit="128"]
-use futures::{pending, pin_mut, poll, join, try_join, select};
-use futures::channel::{mpsc, oneshot};
-use futures::executor::block_on;
-use futures::future::{self, FutureExt, poll_fn};
-use futures::sink::SinkExt;
-use futures::stream::StreamExt;
-use futures::task::{Context, Poll};
-
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn poll_and_pending() {
+ use futures::{pending, pin_mut, poll};
+ use futures::executor::block_on;
+ use futures::task::Poll;
+
let pending_once = async { pending!() };
block_on(async {
pin_mut!(pending_once);
@@ -18,8 +15,14 @@ fn poll_and_pending() {
});
}
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn join() {
+ use futures::{pin_mut, poll, join};
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::task::Poll;
+
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = oneshot::channel::<i32>();
@@ -38,8 +41,14 @@ fn join() {
});
}
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn select() {
+ use futures::select;
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+
let (tx1, rx1) = oneshot::channel::<i32>();
let (_tx2, rx2) = oneshot::channel::<i32>();
tx1.send(1).unwrap();
@@ -56,8 +65,12 @@ fn select() {
assert!(ran);
}
+#[cfg(all(feature = "alloc", feature = "executor", feature = "async-await"))]
#[test]
fn select_biased() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
use futures::select_biased;
let (tx1, rx1) = oneshot::channel::<i32>();
@@ -76,8 +89,15 @@ fn select_biased() {
assert!(ran);
}
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn select_streams() {
+ use futures::select;
+ use futures::channel::mpsc;
+ use futures::executor::block_on;
+ use futures::sink::SinkExt;
+ use futures::stream::StreamExt;
+
let (mut tx1, rx1) = mpsc::channel::<i32>(1);
let (mut tx2, rx2) = mpsc::channel::<i32>(1);
let mut rx1 = rx1.fuse();
@@ -119,8 +139,14 @@ fn select_streams() {
assert!(ran);
}
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn select_can_move_uncompleted_futures() {
+ use futures::select;
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = oneshot::channel::<i32>();
tx1.send(1).unwrap();
@@ -145,8 +171,13 @@ fn select_can_move_uncompleted_futures() {
assert!(ran);
}
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn select_nested() {
+ use futures::select;
+ use futures::executor::block_on;
+ use futures::future;
+
let mut outer_fut = future::ready(1);
let mut inner_fut = future::ready(2);
let res = block_on(async {
@@ -161,8 +192,12 @@ fn select_nested() {
assert_eq!(res, 3);
}
+#[cfg(all(feature = "async-await", feature = "std"))]
#[test]
fn select_size() {
+ use futures::select;
+ use futures::future;
+
let fut = async {
let mut ready = future::ready(0i32);
select! {
@@ -182,8 +217,13 @@ fn select_size() {
assert_eq!(::std::mem::size_of_val(&fut), 40);
}
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn select_on_non_unpin_expressions() {
+ use futures::select;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+
// The returned Future is !Unpin
let make_non_unpin_fut = || { async {
5
@@ -200,8 +240,13 @@ fn select_on_non_unpin_expressions() {
assert_eq!(res, 5);
}
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn select_on_non_unpin_expressions_with_default() {
+ use futures::select;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+
// The returned Future is !Unpin
let make_non_unpin_fut = || { async {
5
@@ -219,8 +264,12 @@ fn select_on_non_unpin_expressions_with_default() {
assert_eq!(res, 5);
}
+#[cfg(all(feature = "async-await", feature = "std"))]
#[test]
fn select_on_non_unpin_size() {
+ use futures::select;
+ use futures::future::FutureExt;
+
// The returned Future is !Unpin
let make_non_unpin_fut = || { async {
5
@@ -235,11 +284,16 @@ fn select_on_non_unpin_size() {
select_res
};
- assert_eq!(48, std::mem::size_of_val(&fut));
+ assert_eq!(32, std::mem::size_of_val(&fut));
}
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn select_can_be_used_as_expression() {
+ use futures::select;
+ use futures::executor::block_on;
+ use futures::future;
+
block_on(async {
let res = select! {
x = future::ready(7) => { x },
@@ -249,8 +303,14 @@ fn select_can_be_used_as_expression() {
});
}
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn select_with_default_can_be_used_as_expression() {
+ use futures::select;
+ use futures::executor::block_on;
+ use futures::future::{FutureExt, poll_fn};
+ use futures::task::{Context, Poll};
+
fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> {
Poll::Pending
}
@@ -265,8 +325,13 @@ fn select_with_default_can_be_used_as_expression() {
});
}
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn select_with_complete_can_be_used_as_expression() {
+ use futures::select;
+ use futures::executor::block_on;
+ use futures::future;
+
block_on(async {
let res = select! {
x = future::pending::<i32>() => { x },
@@ -278,11 +343,16 @@ fn select_with_complete_can_be_used_as_expression() {
});
}
-async fn require_mutable(_: &mut i32) {}
-async fn async_noop() {}
-
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
+ use futures::select;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+
+ async fn require_mutable(_: &mut i32) {}
+ async fn async_noop() {}
+
block_on(async {
let mut value = 234;
select! {
@@ -294,8 +364,16 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
});
}
+#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))]
#[test]
fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
+ use futures::select;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+
+ async fn require_mutable(_: &mut i32) {}
+ async fn async_noop() {}
+
block_on(async {
let mut value = 234;
select! {
@@ -310,8 +388,12 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
});
}
+#[cfg(feature = "async-await")]
#[test]
fn join_size() {
+ use futures::join;
+ use futures::future;
+
let fut = async {
let ready = future::ready(0i32);
join!(ready)
@@ -326,8 +408,12 @@ fn join_size() {
assert_eq!(::std::mem::size_of_val(&fut), 28);
}
+#[cfg(feature = "async-await")]
#[test]
fn try_join_size() {
+ use futures::try_join;
+ use futures::future;
+
let fut = async {
let ready = future::ready(Ok::<i32, i32>(0));
try_join!(ready)
@@ -342,15 +428,21 @@ fn try_join_size() {
assert_eq!(::std::mem::size_of_val(&fut), 28);
}
+#[cfg(feature = "async-await")]
#[test]
fn join_doesnt_require_unpin() {
+ use futures::join;
+
let _ = async {
join!(async {}, async {})
};
}
+#[cfg(feature = "async-await")]
#[test]
fn try_join_doesnt_require_unpin() {
+ use futures::try_join;
+
let _ = async {
try_join!(
async { Ok::<(), ()>(()) },
diff --git a/tests/atomic_waker.rs b/tests/atomic_waker.rs
index d9ce753..5693bd0 100644
--- a/tests/atomic_waker.rs
+++ b/tests/atomic_waker.rs
@@ -1,14 +1,15 @@
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering;
-use std::sync::Arc;
-use std::thread;
-
-use futures::executor::block_on;
-use futures::future::poll_fn;
-use futures::task::{AtomicWaker, Poll};
-
+#[cfg(feature = "executor")]
#[test]
fn basic() {
+ use std::sync::atomic::AtomicUsize;
+ use std::sync::atomic::Ordering;
+ use std::sync::Arc;
+ use std::thread;
+
+ use futures::executor::block_on;
+ use futures::future::poll_fn;
+ use futures::task::{AtomicWaker, Poll};
+
let atomic_waker = Arc::new(AtomicWaker::new());
let atomic_waker_copy = atomic_waker.clone();
diff --git a/tests/buffer_unordered.rs b/tests/buffer_unordered.rs
index 1c559c8..6485a1e 100644
--- a/tests/buffer_unordered.rs
+++ b/tests/buffer_unordered.rs
@@ -1,13 +1,14 @@
-use futures::channel::{oneshot, mpsc};
-use futures::executor::{block_on, block_on_stream};
-use futures::sink::SinkExt;
-use futures::stream::StreamExt;
-use std::sync::mpsc as std_mpsc;
-use std::thread;
-
+#[cfg(all(feature = "alloc", feature = "std", feature = "executor"))]
#[test]
#[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790
fn works() {
+ use futures::channel::{oneshot, mpsc};
+ use futures::executor::{block_on, block_on_stream};
+ use futures::sink::SinkExt;
+ use futures::stream::StreamExt;
+ use std::sync::mpsc as std_mpsc;
+ use std::thread;
+
const N: usize = 4;
let (mut tx, rx) = mpsc::channel(1);
diff --git a/tests/eager_drop.rs b/tests/eager_drop.rs
index 674e401..bfb60a7 100644
--- a/tests/eager_drop.rs
+++ b/tests/eager_drop.rs
@@ -1,13 +1,9 @@
-use futures::channel::oneshot;
-use futures::future::{self, Future, FutureExt, TryFutureExt};
-use futures::task::{Context, Poll};
-use futures_test::future::FutureTestExt;
-use pin_utils::unsafe_pinned;
-use std::pin::Pin;
-use std::sync::mpsc;
-
#[test]
fn map_ok() {
+ use futures::future::{self, FutureExt, TryFutureExt};
+ use futures_test::future::FutureTestExt;
+ use std::sync::mpsc;
+
// The closure given to `map_ok` should have been dropped by the time `map`
// runs.
let (tx1, rx1) = mpsc::channel::<()>();
@@ -26,6 +22,10 @@ fn map_ok() {
#[test]
fn map_err() {
+ use futures::future::{self, FutureExt, TryFutureExt};
+ use futures_test::future::FutureTestExt;
+ use std::sync::mpsc;
+
// The closure given to `map_err` should have been dropped by the time `map`
// runs.
let (tx1, rx1) = mpsc::channel::<()>();
@@ -42,76 +42,101 @@ fn map_err() {
rx2.recv().unwrap();
}
-struct FutureData<F, T> {
- _data: T,
- future: F,
-}
+mod channelled {
+ use pin_utils::unsafe_pinned;
+ use futures::future::Future;
+ use std::pin::Pin;
+ use futures::task::{Context,Poll};
-impl<F, T> FutureData<F, T> {
- unsafe_pinned!(future: F);
-}
-
-impl<F: Future, T: Send + 'static> Future for FutureData<F, T> {
- type Output = F::Output;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
- self.future().poll(cx)
+ struct FutureData<F, T> {
+ _data: T,
+ future: F,
}
-}
-
-#[test]
-fn then_drops_eagerly() {
- let (tx0, rx0) = oneshot::channel::<()>();
- let (tx1, rx1) = mpsc::channel::<()>();
- let (tx2, rx2) = mpsc::channel::<()>();
-
- FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
- .then(move |_| {
- assert!(rx1.recv().is_err()); // tx1 should have been dropped
- tx2.send(()).unwrap();
- future::ready(())
- })
- .run_in_background();
-
- assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
- tx0.send(()).unwrap();
- rx2.recv().unwrap();
-}
-#[test]
-fn and_then_drops_eagerly() {
- let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
- let (tx1, rx1) = mpsc::channel::<()>();
- let (tx2, rx2) = mpsc::channel::<()>();
+ impl<F, T> FutureData<F, T> {
+ unsafe_pinned!(future: F);
+ }
- FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
- .and_then(move |_| {
- assert!(rx1.recv().is_err()); // tx1 should have been dropped
- tx2.send(()).unwrap();
- future::ready(Ok(()))
- })
- .run_in_background();
+ impl<F: Future, T: Send + 'static> Future for FutureData<F, T> {
+ type Output = F::Output;
- assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
- tx0.send(Ok(())).unwrap();
- rx2.recv().unwrap();
-}
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
+ self.future().poll(cx)
+ }
+ }
-#[test]
-fn or_else_drops_eagerly() {
- let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
- let (tx1, rx1) = mpsc::channel::<()>();
- let (tx2, rx2) = mpsc::channel::<()>();
+ #[cfg(feature = "alloc")]
+ #[test]
+ fn then_drops_eagerly() {
+ use futures::channel::oneshot;
+ use futures::future::{self, FutureExt, TryFutureExt};
+ use futures_test::future::FutureTestExt;
+ use std::sync::mpsc;
+
+ let (tx0, rx0) = oneshot::channel::<()>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
+ .then(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready(())
+ })
+ .run_in_background();
+
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(()).unwrap();
+ rx2.recv().unwrap();
+ }
- FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
- .or_else(move |_| {
- assert!(rx1.recv().is_err()); // tx1 should have been dropped
- tx2.send(()).unwrap();
- future::ready::<Result<(), ()>>(Ok(()))
- })
- .run_in_background();
+ #[cfg(feature = "alloc")]
+ #[test]
+ fn and_then_drops_eagerly() {
+ use futures::channel::oneshot;
+ use futures::future::{self, TryFutureExt};
+ use futures_test::future::FutureTestExt;
+ use std::sync::mpsc;
+
+ let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
+ .and_then(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready(Ok(()))
+ })
+ .run_in_background();
+
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(Ok(())).unwrap();
+ rx2.recv().unwrap();
+ }
- assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
- tx0.send(Err(())).unwrap();
- rx2.recv().unwrap();
+ #[cfg(feature = "alloc")]
+ #[test]
+ fn or_else_drops_eagerly() {
+ use futures::channel::oneshot;
+ use futures::future::{self, TryFutureExt};
+ use futures_test::future::FutureTestExt;
+ use std::sync::mpsc;
+
+ let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
+ .or_else(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready::<Result<(), ()>>(Ok(()))
+ })
+ .run_in_background();
+
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(Err(())).unwrap();
+ rx2.recv().unwrap();
+ }
}
diff --git a/tests/eventual.rs b/tests/eventual.rs
index bff000d..6835588 100644
--- a/tests/eventual.rs
+++ b/tests/eventual.rs
@@ -1,3 +1,4 @@
+#![cfg(all(feature = "executor", feature = "thread-pool"))]
use futures::channel::oneshot;
use futures::executor::ThreadPool;
use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
diff --git a/tests/future_try_flatten_stream.rs b/tests/future_try_flatten_stream.rs
index 082c5ef..aa85ed0 100644
--- a/tests/future_try_flatten_stream.rs
+++ b/tests/future_try_flatten_stream.rs
@@ -1,13 +1,10 @@
-use core::marker::PhantomData;
-use core::pin::Pin;
-use futures::executor::block_on_stream;
-use futures::future::{ok, err, TryFutureExt};
-use futures::sink::Sink;
-use futures::stream::{self, Stream, StreamExt};
-use futures::task::{Context, Poll};
-
+#[cfg(feature = "executor")]
#[test]
fn successful_future() {
+ use futures::executor::block_on_stream;
+ use futures::future::{ok, TryFutureExt};
+ use futures::stream::{self, StreamExt};
+
let stream_items = vec![17, 19];
let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok));
@@ -19,20 +16,28 @@ fn successful_future() {
assert_eq!(None, iter.next());
}
-struct PanickingStream<T, E> {
- _marker: PhantomData<(T, E)>
-}
+#[cfg(feature = "executor")]
+#[test]
+fn failed_future() {
+ use core::marker::PhantomData;
+ use core::pin::Pin;
+ use futures::executor::block_on_stream;
+ use futures::future::{err, TryFutureExt};
+ use futures::stream::Stream;
+ use futures::task::{Context, Poll};
-impl<T, E> Stream for PanickingStream<T, E> {
- type Item = Result<T, E>;
+ struct PanickingStream<T, E> {
+ _marker: PhantomData<(T, E)>
+ }
+
+ impl<T, E> Stream for PanickingStream<T, E> {
+ type Item = Result<T, E>;
- fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- panic!()
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ panic!()
+ }
}
-}
-#[test]
-fn failed_future() {
let future_of_a_stream = err::<PanickingStream<bool, u32>, _>(10);
let stream = future_of_a_stream.try_flatten_stream();
let mut iter = block_on_stream(stream);
@@ -40,37 +45,44 @@ fn failed_future() {
assert_eq!(None, iter.next());
}
-struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>);
+#[test]
+fn assert_impls() {
+ use core::marker::PhantomData;
+ use core::pin::Pin;
+ use futures::sink::Sink;
+ use futures::stream::Stream;
+ use futures::task::{Context, Poll};
+ use futures::future::{ok, TryFutureExt};
-impl<T, E, Item> Stream for StreamSink<T, E, Item> {
- type Item = Result<T, E>;
- fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- panic!()
- }
-}
+ struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>);
-impl<T, E, Item> Sink<Item> for StreamSink<T, E, Item> {
- type Error = E;
- fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- panic!()
- }
- fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> {
- panic!()
+ impl<T, E, Item> Stream for StreamSink<T, E, Item> {
+ type Item = Result<T, E>;
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ panic!()
+ }
}
- fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- panic!()
- }
- fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- panic!()
+
+ impl<T, E, Item> Sink<Item> for StreamSink<T, E, Item> {
+ type Error = E;
+ fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ panic!()
+ }
+ fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> {
+ panic!()
+ }
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ panic!()
+ }
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ panic!()
+ }
}
-}
-fn assert_stream<S: Stream>(_: &S) {}
-fn assert_sink<S: Sink<Item>, Item>(_: &S) {}
-fn assert_stream_sink<S: Stream + Sink<Item>, Item>(_: &S) {}
+ fn assert_stream<S: Stream>(_: &S) {}
+ fn assert_sink<S: Sink<Item>, Item>(_: &S) {}
+ fn assert_stream_sink<S: Stream + Sink<Item>, Item>(_: &S) {}
-#[test]
-fn assert_impls() {
let s = ok(StreamSink::<(), (), ()>(PhantomData)).try_flatten_stream();
assert_stream(&s);
assert_sink(&s);
diff --git a/tests/futures_ordered.rs b/tests/futures_ordered.rs
index d06b62f..74a220a 100644
--- a/tests/futures_ordered.rs
+++ b/tests/futures_ordered.rs
@@ -1,12 +1,11 @@
-use futures::channel::oneshot;
-use futures::executor::{block_on, block_on_stream};
-use futures::future::{self, join, Future, FutureExt, TryFutureExt};
-use futures::stream::{StreamExt, FuturesOrdered};
-use futures_test::task::noop_context;
-use std::any::Any;
-
+#[cfg(all(feature = "alloc", feature="executor"))]
#[test]
fn works_1() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on_stream;
+ use futures::stream::{StreamExt, FuturesOrdered};
+ use futures_test::task::noop_context;
+
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
@@ -26,8 +25,14 @@ fn works_1() {
assert_eq!(None, iter.next());
}
+#[cfg(feature = "alloc")]
#[test]
fn works_2() {
+ use futures::channel::oneshot;
+ use futures::future::{join, FutureExt};
+ use futures::stream::{StreamExt, FuturesOrdered};
+ use futures_test::task::noop_context;
+
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
@@ -46,8 +51,13 @@ fn works_2() {
assert!(stream.poll_next_unpin(&mut cx).is_ready());
}
+#[cfg(feature = "executor")]
#[test]
fn from_iterator() {
+ use futures::executor::block_on;
+ use futures::future;
+ use futures::stream::{StreamExt, FuturesOrdered};
+
let stream = vec![
future::ready::<i32>(1),
future::ready::<i32>(2),
@@ -57,8 +67,15 @@ fn from_iterator() {
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
}
+#[cfg(feature = "alloc")]
#[test]
fn queue_never_unblocked() {
+ use futures::channel::oneshot;
+ use futures::future::{self, Future, TryFutureExt};
+ use futures::stream::{StreamExt, FuturesOrdered};
+ use futures_test::task::noop_context;
+ use std::any::Any;
+
let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();
diff --git a/tests/futures_unordered.rs b/tests/futures_unordered.rs
index 57eb98f..3285903 100644
--- a/tests/futures_unordered.rs
+++ b/tests/futures_unordered.rs
@@ -1,18 +1,11 @@
-use std::marker::Unpin;
-use std::pin::Pin;
-use std::sync::atomic::{AtomicBool, Ordering};
-
-use futures::channel::oneshot;
-use futures::executor::{block_on, block_on_stream};
-use futures::future::{self, join, Future, FutureExt};
-use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
-use futures::task::{Context, Poll};
-use futures_test::future::FutureTestExt;
-use futures_test::task::noop_context;
-use futures_test::{assert_stream_done, assert_stream_next};
-
+#[cfg(feature = "alloc")]
#[test]
fn is_terminated() {
+ use futures::future;
+ use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
+ use futures::task::Poll;
+ use futures_test::task::noop_context;
+
let mut cx = noop_context();
let mut tasks = FuturesUnordered::new();
@@ -38,8 +31,13 @@ fn is_terminated() {
assert_eq!(tasks.is_terminated(), true);
}
+#[cfg(all(feature = "alloc", feature = "executor"))]
#[test]
fn works_1() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on_stream;
+ use futures::stream::FuturesUnordered;
+
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
@@ -60,8 +58,15 @@ fn works_1() {
assert_eq!(None, iter.next());
}
+#[cfg(feature = "alloc")]
#[test]
fn works_2() {
+ use futures::channel::oneshot;
+ use futures::future::{join, FutureExt};
+ use futures::stream::{FuturesUnordered, StreamExt};
+ use futures::task::Poll;
+ use futures_test::task::noop_context;
+
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
@@ -83,8 +88,13 @@ fn works_2() {
assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(None));
}
+#[cfg(feature = "executor")]
#[test]
fn from_iterator() {
+ use futures::executor::block_on;
+ use futures::future;
+ use futures::stream::{FuturesUnordered, StreamExt};
+
let stream = vec![
future::ready::<i32>(1),
future::ready::<i32>(2),
@@ -96,8 +106,15 @@ fn from_iterator() {
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
}
+#[cfg(feature = "alloc")]
#[test]
fn finished_future() {
+ use std::marker::Unpin;
+ use futures::channel::oneshot;
+ use futures::future::{self, Future, FutureExt};
+ use futures::stream::{FuturesUnordered, StreamExt};
+ use futures_test::task::noop_context;
+
let (_a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
@@ -121,8 +138,13 @@ fn finished_future() {
assert!(stream.poll_next_unpin(cx).is_pending());
}
+#[cfg(all(feature = "alloc", feature = "executor"))]
#[test]
fn iter_mut_cancel() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on_stream;
+ use futures::stream::FuturesUnordered;
+
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
@@ -147,8 +169,12 @@ fn iter_mut_cancel() {
assert_eq!(iter.next(), None);
}
+#[cfg(feature = "alloc")]
#[test]
fn iter_mut_len() {
+ use futures::future;
+ use futures::stream::FuturesUnordered;
+
let mut stream = vec![
future::pending::<()>(),
future::pending::<()>(),
@@ -168,8 +194,18 @@ fn iter_mut_len() {
assert!(iter_mut.next().is_none());
}
+#[cfg(feature = "executor")]
#[test]
fn iter_cancel() {
+ use std::marker::Unpin;
+ use std::pin::Pin;
+ use std::sync::atomic::{AtomicBool, Ordering};
+
+ use futures::executor::block_on_stream;
+ use futures::future::{self, Future, FutureExt};
+ use futures::stream::FuturesUnordered;
+ use futures::task::{Context, Poll};
+
struct AtomicCancel<F> {
future: F,
cancel: AtomicBool,
@@ -213,8 +249,12 @@ fn iter_cancel() {
assert_eq!(iter.next(), None);
}
+#[cfg(feature = "alloc")]
#[test]
fn iter_len() {
+ use futures::future;
+ use futures::stream::FuturesUnordered;
+
let stream = vec![
future::pending::<()>(),
future::pending::<()>(),
@@ -234,8 +274,14 @@ fn iter_len() {
assert!(iter.next().is_none());
}
+#[cfg(feature = "alloc")]
#[test]
fn futures_not_moved_after_poll() {
+ use futures::future;
+ use futures::stream::FuturesUnordered;
+ use futures_test::future::FutureTestExt;
+ use futures_test::{assert_stream_done, assert_stream_next};
+
// Future that will be ready after being polled twice,
// asserting that it does not move.
let fut = future::ready(()).pending_once().assert_unmoved();
@@ -246,8 +292,14 @@ fn futures_not_moved_after_poll() {
assert_stream_done!(stream);
}
+#[cfg(feature = "alloc")]
#[test]
fn len_valid_during_out_of_order_completion() {
+ use futures::channel::oneshot;
+ use futures::stream::{FuturesUnordered, StreamExt};
+ use futures::task::Poll;
+ use futures_test::task::noop_context;
+
// Complete futures out-of-order and add new futures afterwards to ensure
// length values remain correct.
let (a_tx, a_rx) = oneshot::channel::<i32>();
diff --git a/tests/inspect.rs b/tests/inspect.rs
index 42f6f73..4cbe477 100644
--- a/tests/inspect.rs
+++ b/tests/inspect.rs
@@ -1,8 +1,9 @@
-use futures::executor::block_on;
-use futures::future::{self, FutureExt};
-
+#[cfg(feature = "executor")]
#[test]
fn smoke() {
+ use futures::executor::block_on;
+ use futures::future::{self, FutureExt};
+
let mut counter = 0;
{
diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs
index a3d723a..07d934d 100644
--- a/tests/io_buf_reader.rs
+++ b/tests/io_buf_reader.rs
@@ -1,32 +1,10 @@
-use futures::executor::block_on;
-use futures::future::{Future, FutureExt};
-use futures::io::{
- AsyncSeek, AsyncSeekExt, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt,
- AllowStdIo, BufReader, Cursor, SeekFrom,
-};
-use futures::task::{Context, Poll};
-use futures_test::task::noop_context;
-use std::cmp;
-use std::io;
-use std::pin::Pin;
-
-/// A dummy reader intended at testing short-reads propagation.
-struct ShortReader {
- lengths: Vec<usize>,
-}
-
-impl io::Read for ShortReader {
- fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
- if self.lengths.is_empty() {
- Ok(0)
- } else {
- Ok(self.lengths.remove(0))
- }
- }
-}
-
+#[cfg(any(feature = "std", feature = "executor"))]
macro_rules! run_fill_buf {
($reader:expr) => {{
+ use futures_test::task::noop_context;
+ use futures::task::Poll;
+ use std::pin::Pin;
+
let mut cx = noop_context();
loop {
if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
@@ -36,8 +14,83 @@ macro_rules! run_fill_buf {
}};
}
+#[cfg(any(feature = "std", feature = "executor"))]
+mod util {
+ use futures::future::Future;
+ pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ use futures_test::task::noop_context;
+ use futures::task::Poll;
+ use futures::future::FutureExt;
+
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+ }
+}
+
+#[cfg(feature = "std")]
+mod maybe_pending {
+ use futures::task::{Context,Poll};
+ use std::{cmp,io};
+ use std::pin::Pin;
+ use futures::io::{AsyncRead,AsyncBufRead};
+
+ pub struct MaybePending<'a> {
+ inner: &'a [u8],
+ ready_read: bool,
+ ready_fill_buf: bool,
+ }
+
+ impl<'a> MaybePending<'a> {
+ pub fn new(inner: &'a [u8]) -> Self {
+ Self { inner, ready_read: false, ready_fill_buf: false }
+ }
+ }
+
+ impl AsyncRead for MaybePending<'_> {
+ fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
+ -> Poll<io::Result<usize>>
+ {
+ if self.ready_read {
+ self.ready_read = false;
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ } else {
+ self.ready_read = true;
+ Poll::Pending
+ }
+ }
+ }
+
+ impl AsyncBufRead for MaybePending<'_> {
+ fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>)
+ -> Poll<io::Result<&[u8]>>
+ {
+ if self.ready_fill_buf {
+ self.ready_fill_buf = false;
+ if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
+ let len = cmp::min(2, self.inner.len());
+ Poll::Ready(Ok(&self.inner[0..len]))
+ } else {
+ self.ready_fill_buf = true;
+ Poll::Pending
+ }
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ self.inner = &self.inner[amt..];
+ }
+ }
+}
+
+#[cfg(feature = "executor")]
#[test]
fn test_buffered_reader() {
+ use futures::executor::block_on;
+ use futures::io::{AsyncReadExt, BufReader};
+
let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
let mut reader = BufReader::with_capacity(2, inner);
@@ -73,8 +126,14 @@ fn test_buffered_reader() {
assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
}
+#[cfg(feature = "executor")]
#[test]
fn test_buffered_reader_seek() {
+ use futures::executor::block_on;
+ use futures::io::{AsyncSeekExt, AsyncBufRead, BufReader, Cursor, SeekFrom};
+ use std::pin::Pin;
+ use util::run;
+
let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
@@ -88,8 +147,13 @@ fn test_buffered_reader_seek() {
assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
}
+#[cfg(feature = "executor")]
#[test]
fn test_buffered_reader_seek_underflow() {
+ use futures::executor::block_on;
+ use futures::io::{AsyncSeekExt, AsyncBufRead, AllowStdIo, BufReader, SeekFrom};
+ use std::io;
+
// gimmick reader that yields its position modulo 256 for each byte
struct PositionReader {
pos: u64
@@ -134,8 +198,28 @@ fn test_buffered_reader_seek_underflow() {
assert_eq!(reader.get_ref().get_ref().pos, expected);
}
+#[cfg(feature = "executor")]
#[test]
fn test_short_reads() {
+ use futures::executor::block_on;
+ use futures::io::{AsyncReadExt, AllowStdIo, BufReader};
+ use std::io;
+
+ /// A dummy reader intended at testing short-reads propagation.
+ struct ShortReader {
+ lengths: Vec<usize>,
+ }
+
+ impl io::Read for ShortReader {
+ fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
+ if self.lengths.is_empty() {
+ Ok(0)
+ } else {
+ Ok(self.lengths.remove(0))
+ }
+ }
+ }
+
let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
let mut reader = BufReader::new(AllowStdIo::new(inner));
let mut buf = [0, 0];
@@ -148,63 +232,13 @@ fn test_short_reads() {
assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
}
-struct MaybePending<'a> {
- inner: &'a [u8],
- ready_read: bool,
- ready_fill_buf: bool,
-}
-
-impl<'a> MaybePending<'a> {
- fn new(inner: &'a [u8]) -> Self {
- Self { inner, ready_read: false, ready_fill_buf: false }
- }
-}
-
-impl AsyncRead for MaybePending<'_> {
- fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
- -> Poll<io::Result<usize>>
- {
- if self.ready_read {
- self.ready_read = false;
- Pin::new(&mut self.inner).poll_read(cx, buf)
- } else {
- self.ready_read = true;
- Poll::Pending
- }
- }
-}
-
-impl AsyncBufRead for MaybePending<'_> {
- fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>)
- -> Poll<io::Result<&[u8]>>
- {
- if self.ready_fill_buf {
- self.ready_fill_buf = false;
- if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
- let len = cmp::min(2, self.inner.len());
- Poll::Ready(Ok(&self.inner[0..len]))
- } else {
- self.ready_fill_buf = true;
- Poll::Pending
- }
- }
-
- fn consume(mut self: Pin<&mut Self>, amt: usize) {
- self.inner = &self.inner[amt..];
- }
-}
-
-fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
- return x;
- }
- }
-}
-
+#[cfg(feature = "std")]
#[test]
fn maybe_pending() {
+ use futures::io::{AsyncReadExt, BufReader};
+ use util::run;
+ use maybe_pending::MaybePending;
+
let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
@@ -240,8 +274,13 @@ fn maybe_pending() {
assert_eq!(run(reader.read(&mut buf)).unwrap(), 0);
}
+#[cfg(feature = "std")]
#[test]
fn maybe_pending_buf_read() {
+ use futures::io::{AsyncBufReadExt, BufReader};
+ use util::run;
+ use maybe_pending::MaybePending;
+
let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
let mut reader = BufReader::with_capacity(2, inner);
let mut v = Vec::new();
@@ -258,55 +297,63 @@ fn maybe_pending_buf_read() {
assert_eq!(v, []);
}
-struct MaybePendingSeek<'a> {
- inner: Cursor<&'a [u8]>,
- ready: bool,
-}
-
-impl<'a> MaybePendingSeek<'a> {
- fn new(inner: &'a [u8]) -> Self {
- Self { inner: Cursor::new(inner), ready: true }
+// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
+#[cfg(feature = "std")]
+#[test]
+fn maybe_pending_seek() {
+ use futures::io::{AsyncBufRead, AsyncSeek, AsyncSeekExt, AsyncRead, BufReader,
+ Cursor, SeekFrom
+ };
+ use futures::task::{Context,Poll};
+ use std::io;
+ use std::pin::Pin;
+ use util::run;
+ pub struct MaybePendingSeek<'a> {
+ inner: Cursor<&'a [u8]>,
+ ready: bool,
}
-}
-impl AsyncRead for MaybePendingSeek<'_> {
- fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
- -> Poll<io::Result<usize>>
- {
- Pin::new(&mut self.inner).poll_read(cx, buf)
+ impl<'a> MaybePendingSeek<'a> {
+ pub fn new(inner: &'a [u8]) -> Self {
+ Self { inner: Cursor::new(inner), ready: true }
+ }
}
-}
-impl AsyncBufRead for MaybePendingSeek<'_> {
- fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
- -> Poll<io::Result<&[u8]>>
- {
- let this: *mut Self = &mut *self as *mut _;
- Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
+ impl AsyncRead for MaybePendingSeek<'_> {
+ fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
+ -> Poll<io::Result<usize>>
+ {
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ }
}
- fn consume(mut self: Pin<&mut Self>, amt: usize) {
- Pin::new(&mut self.inner).consume(amt)
+ impl AsyncBufRead for MaybePendingSeek<'_> {
+ fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
+ -> Poll<io::Result<&[u8]>>
+ {
+ let this: *mut Self = &mut *self as *mut _;
+ Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ Pin::new(&mut self.inner).consume(amt)
+ }
}
-}
-impl AsyncSeek for MaybePendingSeek<'_> {
- fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
- -> Poll<io::Result<u64>>
- {
- if self.ready {
- self.ready = false;
- Pin::new(&mut self.inner).poll_seek(cx, pos)
- } else {
- self.ready = true;
- Poll::Pending
+ impl AsyncSeek for MaybePendingSeek<'_> {
+ fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
+ -> Poll<io::Result<u64>>
+ {
+ if self.ready {
+ self.ready = false;
+ Pin::new(&mut self.inner).poll_seek(cx, pos)
+ } else {
+ self.ready = true;
+ Poll::Pending
+ }
}
}
-}
-// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
-#[test]
-fn maybe_pending_seek() {
let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs
index 7bdcd16..935335b 100644
--- a/tests/io_buf_writer.rs
+++ b/tests/io_buf_writer.rs
@@ -1,13 +1,70 @@
-use futures::executor::block_on;
-use futures::future::{Future, FutureExt};
-use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
-use futures::task::{Context, Poll};
-use futures_test::task::noop_context;
-use std::io;
-use std::pin::Pin;
+#[cfg(feature = "std")]
+mod maybe_pending {
+ use futures::io::AsyncWrite;
+ use futures::task::{Context, Poll};
+ use std::io;
+ use std::pin::Pin;
+
+ pub struct MaybePending {
+ pub inner: Vec<u8>,
+ ready: bool,
+ }
+
+ impl MaybePending {
+ pub fn new(inner: Vec<u8>) -> Self {
+ Self { inner, ready: false }
+ }
+ }
+
+ impl AsyncWrite for MaybePending {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.ready {
+ self.ready = false;
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ } else {
+ self.ready = true;
+ Poll::Pending
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_close(cx)
+ }
+ }
+}
+
+#[cfg(any(feature = "std", feature = "executor"))]
+mod util {
+ use futures::future::Future;
+
+ pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ use futures::future::FutureExt;
+ use futures::task::Poll;
+ use futures_test::task::noop_context;
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+ }
+}
+
+#[cfg(feature = "executor")]
#[test]
fn buf_writer() {
+ use futures::executor::block_on;
+ use futures::io::{AsyncWriteExt, BufWriter};
+
let mut writer = BufWriter::with_capacity(2, Vec::new());
block_on(writer.write(&[0, 1])).unwrap();
@@ -48,8 +105,12 @@ fn buf_writer() {
assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
}
+#[cfg(feature = "executor")]
#[test]
fn buf_writer_inner_flushes() {
+ use futures::executor::block_on;
+ use futures::io::{AsyncWriteExt, BufWriter};
+
let mut w = BufWriter::with_capacity(3, Vec::new());
block_on(w.write(&[0, 1])).unwrap();
assert_eq!(*w.get_ref(), []);
@@ -58,8 +119,12 @@ fn buf_writer_inner_flushes() {
assert_eq!(w, [0, 1]);
}
+#[cfg(feature = "executor")]
#[test]
fn buf_writer_seek() {
+ use futures::executor::block_on;
+ use futures::io::{AsyncSeekExt, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
+
// FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
// use `Vec::new` instead of `vec![0; 8]`.
let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8]));
@@ -73,52 +138,14 @@ fn buf_writer_seek() {
assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]);
}
-struct MaybePending {
- inner: Vec<u8>,
- ready: bool,
-}
-
-impl MaybePending {
- fn new(inner: Vec<u8>) -> Self {
- Self { inner, ready: false }
- }
-}
-
-impl AsyncWrite for MaybePending {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- if self.ready {
- self.ready = false;
- Pin::new(&mut self.inner).poll_write(cx, buf)
- } else {
- self.ready = true;
- Poll::Pending
- }
- }
-
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Pin::new(&mut self.inner).poll_flush(cx)
- }
-
- fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Pin::new(&mut self.inner).poll_close(cx)
- }
-}
-
-fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
- return x;
- }
- }
-}
-
+#[cfg(feature = "std")]
#[test]
fn maybe_pending_buf_writer() {
+ use futures::io::{AsyncWriteExt, BufWriter};
+
+ use maybe_pending::MaybePending;
+ use util::run;
+
let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new()));
run(writer.write(&[0, 1])).unwrap();
@@ -159,8 +186,14 @@ fn maybe_pending_buf_writer() {
assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
}
+#[cfg(feature = "std")]
#[test]
fn maybe_pending_buf_writer_inner_flushes() {
+ use futures::io::{AsyncWriteExt, BufWriter};
+
+ use maybe_pending::MaybePending;
+ use util::run;
+
let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new()));
run(w.write(&[0, 1])).unwrap();
assert_eq!(&w.get_ref().inner, &[]);
@@ -169,59 +202,66 @@ fn maybe_pending_buf_writer_inner_flushes() {
assert_eq!(w, [0, 1]);
}
+#[cfg(feature = "std")]
+#[test]
+fn maybe_pending_buf_writer_seek() {
+ use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
+ use futures::task::{Context, Poll};
+ use std::io;
+ use std::pin::Pin;
-struct MaybePendingSeek {
- inner: Cursor<Vec<u8>>,
- ready_write: bool,
- ready_seek: bool,
-}
+ use util::run;
-impl MaybePendingSeek {
- fn new(inner: Vec<u8>) -> Self {
- Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false }
+ struct MaybePendingSeek {
+ inner: Cursor<Vec<u8>>,
+ ready_write: bool,
+ ready_seek: bool,
}
-}
-impl AsyncWrite for MaybePendingSeek {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- if self.ready_write {
- self.ready_write = false;
- Pin::new(&mut self.inner).poll_write(cx, buf)
- } else {
- self.ready_write = true;
- Poll::Pending
+ impl MaybePendingSeek {
+ fn new(inner: Vec<u8>) -> Self {
+ Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false }
}
}
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Pin::new(&mut self.inner).poll_flush(cx)
- }
+ impl AsyncWrite for MaybePendingSeek {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.ready_write {
+ self.ready_write = false;
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ } else {
+ self.ready_write = true;
+ Poll::Pending
+ }
+ }
- fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Pin::new(&mut self.inner).poll_close(cx)
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_close(cx)
+ }
}
-}
-impl AsyncSeek for MaybePendingSeek {
- fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
- -> Poll<io::Result<u64>>
- {
- if self.ready_seek {
- self.ready_seek = false;
- Pin::new(&mut self.inner).poll_seek(cx, pos)
- } else {
- self.ready_seek = true;
- Poll::Pending
+ impl AsyncSeek for MaybePendingSeek {
+ fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
+ -> Poll<io::Result<u64>>
+ {
+ if self.ready_seek {
+ self.ready_seek = false;
+ Pin::new(&mut self.inner).poll_seek(cx, pos)
+ } else {
+ self.ready_seek = true;
+ Poll::Pending
+ }
}
}
-}
-#[test]
-fn maybe_pending_buf_writer_seek() {
// FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
// use `Vec::new` instead of `vec![0; 8]`.
let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(vec![0; 8]));
diff --git a/tests/io_cursor.rs b/tests/io_cursor.rs
index 4f80a75..0a93c83 100644
--- a/tests/io_cursor.rs
+++ b/tests/io_cursor.rs
@@ -1,11 +1,12 @@
-use assert_matches::assert_matches;
-use futures::future::lazy;
-use futures::io::{AsyncWrite, Cursor};
-use futures::task::Poll;
-use std::pin::Pin;
-
+#[cfg(all(feature = "std", feature = "executor"))]
#[test]
fn cursor_asyncwrite_vec() {
+ use assert_matches::assert_matches;
+ use futures::future::lazy;
+ use futures::io::{AsyncWrite, Cursor};
+ use futures::task::Poll;
+ use std::pin::Pin;
+
let mut cursor = Cursor::new(vec![0; 5]);
futures::executor::block_on(lazy(|cx| {
assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2)));
@@ -16,8 +17,15 @@ fn cursor_asyncwrite_vec() {
assert_eq!(cursor.into_inner(), [1, 2, 3, 4, 5, 6, 6, 7]);
}
+#[cfg(all(feature = "std", feature = "executor"))]
#[test]
fn cursor_asyncwrite_box() {
+ use assert_matches::assert_matches;
+ use futures::future::lazy;
+ use futures::io::{AsyncWrite, Cursor};
+ use futures::task::Poll;
+ use std::pin::Pin;
+
let mut cursor = Cursor::new(vec![0; 5].into_boxed_slice());
futures::executor::block_on(lazy(|cx| {
assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2)));
diff --git a/tests/io_lines.rs b/tests/io_lines.rs
index 39eafa9..e10edd0 100644
--- a/tests/io_lines.rs
+++ b/tests/io_lines.rs
@@ -1,19 +1,34 @@
-use futures::executor::block_on;
-use futures::future::{Future, FutureExt};
-use futures::stream::{self, StreamExt, TryStreamExt};
-use futures::io::{AsyncBufReadExt, Cursor};
-use futures::task::Poll;
-use futures_test::io::AsyncReadTestExt;
-use futures_test::task::noop_context;
-
-macro_rules! block_on_next {
- ($expr:expr) => {
- block_on($expr.next()).unwrap().unwrap()
- };
+#[cfg(any(feature = "std", feature = "executor"))]
+mod util {
+ use futures::future::Future;
+
+ pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ use futures_test::task::noop_context;
+ use futures::task::Poll;
+ use futures::future::FutureExt;
+
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+ }
}
+#[cfg(feature = "executor")]
#[test]
fn lines() {
+ use futures::executor::block_on;
+ use futures::stream::StreamExt;
+ use futures::io::{AsyncBufReadExt, Cursor};
+
+ macro_rules! block_on_next {
+ ($expr:expr) => {
+ block_on($expr.next()).unwrap().unwrap()
+ };
+ }
+
let buf = Cursor::new(&b"12\r"[..]);
let mut s = buf.lines();
assert_eq!(block_on_next!(s), "12\r".to_string());
@@ -26,23 +41,21 @@ fn lines() {
assert!(block_on(s.next()).is_none());
}
-fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
- return x;
- }
- }
-}
-
-macro_rules! run_next {
- ($expr:expr) => {
- run($expr.next()).unwrap().unwrap()
- };
-}
-
+#[cfg(feature = "std")]
#[test]
fn maybe_pending() {
+ use futures::stream::{self, StreamExt, TryStreamExt};
+ use futures::io::AsyncBufReadExt;
+ use futures_test::io::AsyncReadTestExt;
+
+ use util::run;
+
+ macro_rules! run_next {
+ ($expr:expr) => {
+ run($expr.next()).unwrap().unwrap()
+ };
+ }
+
let buf = stream::iter(vec![&b"12"[..], &b"\r"[..]])
.map(Ok)
.into_async_read()
diff --git a/tests/io_read.rs b/tests/io_read.rs
index f99c4ed..ba68fcc 100644
--- a/tests/io_read.rs
+++ b/tests/io_read.rs
@@ -1,33 +1,44 @@
-use futures::io::AsyncRead;
-use futures_test::task::panic_context;
-use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
+#[cfg(feature = "std")]
+mod mock_reader {
+ use futures::io::AsyncRead;
+ use std::io;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
-struct MockReader {
- fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>,
-}
+ pub struct MockReader {
+ fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>,
+ }
-impl MockReader {
- pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
- MockReader { fun: Box::new(fun) }
+ impl MockReader {
+ pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
+ MockReader { fun: Box::new(fun) }
+ }
}
-}
-impl AsyncRead for MockReader {
- fn poll_read(
- self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- buf: &mut [u8]
- ) -> Poll<io::Result<usize>> {
- (self.get_mut().fun)(buf)
+ impl AsyncRead for MockReader {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut [u8]
+ ) -> Poll<io::Result<usize>> {
+ (self.get_mut().fun)(buf)
+ }
}
}
/// Verifies that the default implementation of `poll_read_vectored`
/// calls `poll_read` with an empty slice if no buffers are provided.
+#[cfg(feature = "std")]
#[test]
fn read_vectored_no_buffers() {
+ use futures::io::AsyncRead;
+ use futures_test::task::panic_context;
+ use std::io;
+ use std::pin::Pin;
+ use std::task::Poll;
+
+ use mock_reader::MockReader;
+
let mut reader = MockReader::new(|buf| {
assert_eq!(buf, b"");
Err(io::ErrorKind::BrokenPipe.into()).into()
@@ -42,8 +53,17 @@ fn read_vectored_no_buffers() {
/// Verifies that the default implementation of `poll_read_vectored`
/// calls `poll_read` with the first non-empty buffer.
+#[cfg(feature = "std")]
#[test]
fn read_vectored_first_non_empty() {
+ use futures::io::AsyncRead;
+ use futures_test::task::panic_context;
+ use std::io;
+ use std::pin::Pin;
+ use std::task::Poll;
+
+ use mock_reader::MockReader;
+
let mut reader = MockReader::new(|buf| {
assert_eq!(buf.len(), 4);
buf.copy_from_slice(b"four");
diff --git a/tests/io_read_exact.rs b/tests/io_read_exact.rs
index 4941773..a772e34 100644
--- a/tests/io_read_exact.rs
+++ b/tests/io_read_exact.rs
@@ -1,8 +1,9 @@
-use futures::executor::block_on;
-use futures::io::AsyncReadExt;
-
+#[cfg(feature = "executor")]
#[test]
fn read_exact() {
+ use futures::executor::block_on;
+ use futures::io::AsyncReadExt;
+
let mut reader: &[u8] = &[1, 2, 3, 4, 5];
let mut out = [0u8; 3];
diff --git a/tests/io_read_line.rs b/tests/io_read_line.rs
index d1dba5e..ab25f26 100644
--- a/tests/io_read_line.rs
+++ b/tests/io_read_line.rs
@@ -1,13 +1,9 @@
-use futures::executor::block_on;
-use futures::future::{Future, FutureExt};
-use futures::stream::{self, StreamExt, TryStreamExt};
-use futures::io::{AsyncBufReadExt, Cursor};
-use futures::task::Poll;
-use futures_test::io::AsyncReadTestExt;
-use futures_test::task::noop_context;
-
+#[cfg(feature = "executor")]
#[test]
fn read_line() {
+ use futures::executor::block_on;
+ use futures::io::{AsyncBufReadExt, Cursor};
+
let mut buf = Cursor::new(b"12");
let mut v = String::new();
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2);
@@ -25,17 +21,28 @@ fn read_line() {
assert_eq!(v, "");
}
-fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
- return x;
+#[cfg(feature = "std")]
+#[test]
+fn maybe_pending() {
+ use futures::future::Future;
+
+ fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ use futures::future::FutureExt;
+ use futures::task::Poll;
+ use futures_test::task::noop_context;
+
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
}
}
-}
-#[test]
-fn maybe_pending() {
+ use futures::stream::{self, StreamExt, TryStreamExt};
+ use futures::io::AsyncBufReadExt;
+ use futures_test::io::AsyncReadTestExt;
+
let mut buf = b"12".interleave_pending();
let mut v = String::new();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2);
diff --git a/tests/io_read_to_string.rs b/tests/io_read_to_string.rs
index db825af..0a79a22 100644
--- a/tests/io_read_to_string.rs
+++ b/tests/io_read_to_string.rs
@@ -1,13 +1,9 @@
-use futures::executor::block_on;
-use futures::future::{Future, FutureExt};
-use futures::stream::{self, StreamExt, TryStreamExt};
-use futures::io::{AsyncReadExt, Cursor};
-use futures::task::Poll;
-use futures_test::io::AsyncReadTestExt;
-use futures_test::task::noop_context;
-
+#[cfg(all(feature = "std", feature = "executor"))]
#[test]
fn read_to_string() {
+ use futures::executor::block_on;
+ use futures::io::{AsyncReadExt, Cursor};
+
let mut c = Cursor::new(&b""[..]);
let mut v = String::new();
assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0);
@@ -23,17 +19,26 @@ fn read_to_string() {
assert!(block_on(c.read_to_string(&mut v)).is_err());
}
-fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
- return x;
- }
- }
-}
-
+#[cfg(feature = "std")]
#[test]
fn interleave_pending() {
+ use futures::future::Future;
+ use futures::stream::{self, StreamExt, TryStreamExt};
+ use futures::io::AsyncReadExt;
+ use futures_test::io::AsyncReadTestExt;
+
+ fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ use futures::future::FutureExt;
+ use futures_test::task::noop_context;
+ use futures::task::Poll;
+
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+ }
let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]])
.map(Ok)
.into_async_read()
diff --git a/tests/io_read_until.rs b/tests/io_read_until.rs
index 5152281..1e018b7 100644
--- a/tests/io_read_until.rs
+++ b/tests/io_read_until.rs
@@ -1,13 +1,9 @@
-use futures::executor::block_on;
-use futures::future::{Future, FutureExt};
-use futures::stream::{self, StreamExt, TryStreamExt};
-use futures::io::{AsyncBufReadExt, Cursor};
-use futures::task::Poll;
-use futures_test::io::AsyncReadTestExt;
-use futures_test::task::noop_context;
-
+#[cfg(feature = "executor")]
#[test]
fn read_until() {
+ use futures::executor::block_on;
+ use futures::io::{AsyncBufReadExt, Cursor};
+
let mut buf = Cursor::new(b"12");
let mut v = Vec::new();
assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2);
@@ -25,17 +21,29 @@ fn read_until() {
assert_eq!(v, []);
}
-fn run<F: Future + Unpin>(mut f: F) -> F::Output {
- let mut cx = noop_context();
- loop {
- if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
- return x;
- }
- }
-}
+#[cfg(feature = "std")]
#[test]
fn maybe_pending() {
+ use futures::future::Future;
+
+ fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ use futures::future::FutureExt;
+ use futures_test::task::noop_context;
+ use futures::task::Poll;
+
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+ }
+
+ use futures::stream::{self, StreamExt, TryStreamExt};
+ use futures::io::AsyncBufReadExt;
+ use futures_test::io::AsyncReadTestExt;
+
let mut buf = b"12".interleave_pending();
let mut v = Vec::new();
assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2);
diff --git a/tests/io_window.rs b/tests/io_window.rs
index 98df69c..8cc41a3 100644
--- a/tests/io_window.rs
+++ b/tests/io_window.rs
@@ -1,3 +1,4 @@
+#![cfg(feature = "std")]
use futures::io::Window;
#[test]
diff --git a/tests/io_write.rs b/tests/io_write.rs
index b963444..af0d5c3 100644
--- a/tests/io_write.rs
+++ b/tests/io_write.rs
@@ -1,41 +1,52 @@
-use futures::io::AsyncWrite;
-use futures_test::task::panic_context;
-use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
+#[cfg(feature = "std")]
+mod mock_writer {
+ use futures::io::AsyncWrite;
+ use std::io;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
-struct MockWriter {
- fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>,
-}
-
-impl MockWriter {
- pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
- MockWriter { fun: Box::new(fun) }
+ pub struct MockWriter {
+ fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>,
}
-}
-impl AsyncWrite for MockWriter {
- fn poll_write(
- self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- (self.get_mut().fun)(buf)
+ impl MockWriter {
+ pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
+ MockWriter { fun: Box::new(fun) }
+ }
}
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- panic!()
- }
+ impl AsyncWrite for MockWriter {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ (self.get_mut().fun)(buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ panic!()
+ }
- fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- panic!()
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ panic!()
+ }
}
}
/// Verifies that the default implementation of `poll_write_vectored`
/// calls `poll_write` with an empty slice if no buffers are provided.
+#[cfg(feature = "std")]
#[test]
fn write_vectored_no_buffers() {
+ use futures::io::AsyncWrite;
+ use futures_test::task::panic_context;
+ use std::io;
+ use std::pin::Pin;
+ use std::task::Poll;
+
+ use mock_writer::MockWriter;
+
let mut writer = MockWriter::new(|buf| {
assert_eq!(buf, b"");
Err(io::ErrorKind::BrokenPipe.into()).into()
@@ -50,8 +61,17 @@ fn write_vectored_no_buffers() {
/// Verifies that the default implementation of `poll_write_vectored`
/// calls `poll_write` with the first non-empty buffer.
+#[cfg(feature = "std")]
#[test]
fn write_vectored_first_non_empty() {
+ use futures::io::AsyncWrite;
+ use futures_test::task::panic_context;
+ use std::io;
+ use std::pin::Pin;
+ use std::task::Poll;
+
+ use mock_writer::MockWriter;
+
let mut writer = MockWriter::new(|buf| {
assert_eq!(buf, b"four");
Poll::Ready(Ok(4))
diff --git a/tests/join_all.rs b/tests/join_all.rs
index 63967bf..0d8fbf2 100644
--- a/tests/join_all.rs
+++ b/tests/join_all.rs
@@ -1,29 +1,38 @@
-use futures_util::future::*;
-use std::future::Future;
-use futures::executor::block_on;
-use std::fmt::Debug;
-
-fn assert_done<T, F>(actual_fut: F, expected: T)
-where
- T: PartialEq + Debug,
- F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
-{
- let output = block_on(actual_fut());
- assert_eq!(output, expected);
+#[cfg(feature = "executor")]
+mod util {
+ use std::future::Future;
+ use std::fmt::Debug;
+
+ pub fn assert_done<T, F>(actual_fut: F, expected: T)
+ where
+ T: PartialEq + Debug,
+ F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
+ {
+ use futures::executor::block_on;
+
+ let output = block_on(actual_fut());
+ assert_eq!(output, expected);
+ }
}
+#[cfg(feature = "executor")]
#[test]
fn collect_collects() {
- assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]);
- assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]);
+ use futures_util::future::{join_all,ready};
+
+ util::assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]);
+ util::assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]);
// REVIEW: should this be implemented?
// assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]);
// TODO: needs more tests
}
+#[cfg(feature = "executor")]
#[test]
fn join_all_iter_lifetime() {
+ use futures_util::future::{join_all,ready};
+ use std::future::Future;
// In futures-rs version 0.1, this function would fail to typecheck due to an overly
// conservative type parameterization of `JoinAll`.
fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> {
@@ -31,12 +40,15 @@ fn join_all_iter_lifetime() {
Box::new(join_all(iter))
}
- assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3 as usize, 0, 1]);
+ util::assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3 as usize, 0, 1]);
}
+#[cfg(feature = "executor")]
#[test]
fn join_all_from_iter() {
- assert_done(
+ use futures_util::future::{JoinAll,ready};
+
+ util::assert_done(
|| Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()),
vec![1, 2],
)
diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs
index 111f65a..e6a609b 100644
--- a/tests/macro_comma_support.rs
+++ b/tests/macro_comma_support.rs
@@ -1,29 +1,41 @@
-#[macro_use]
-extern crate futures;
-
-use futures::{
- executor::block_on,
- future::{self, FutureExt},
- task::Poll,
-};
-
+#[cfg(feature = "executor")]
#[test]
fn ready() {
+ use futures::{
+ executor::block_on,
+ future,
+ task::Poll,
+ ready,
+ };
+
block_on(future::poll_fn(|_| {
ready!(Poll::Ready(()),);
Poll::Ready(())
}))
}
+#[cfg(all(feature = "executor", feature = "async-await"))]
#[test]
fn poll() {
+ use futures::{
+ executor::block_on,
+ future::FutureExt,
+ poll,
+ };
+
block_on(async {
let _ = poll!(async {}.boxed(),);
})
}
+#[cfg(all(feature = "executor", feature = "async-await"))]
#[test]
fn join() {
+ use futures::{
+ executor::block_on,
+ join
+ };
+
block_on(async {
let future1 = async { 1 };
let future2 = async { 2 };
@@ -31,8 +43,15 @@ fn join() {
})
}
+#[cfg(all(feature = "executor", feature = "async-await"))]
#[test]
fn try_join() {
+ use futures::{
+ executor::block_on,
+ future::FutureExt,
+ try_join,
+ };
+
block_on(async {
let future1 = async { 1 }.never_error();
let future2 = async { 2 }.never_error();
diff --git a/tests/mutex.rs b/tests/mutex.rs
index bad53a9..7ee9f41 100644
--- a/tests/mutex.rs
+++ b/tests/mutex.rs
@@ -1,23 +1,24 @@
-use futures::channel::mpsc;
-use futures::executor::block_on;
-use futures::future::{ready, FutureExt};
-use futures::lock::Mutex;
-use futures::stream::StreamExt;
-use futures::task::{Context, SpawnExt};
-use futures_test::future::FutureTestExt;
-use futures_test::task::{new_count_waker, panic_context};
-use std::sync::Arc;
-
+#[cfg(all(feature = "alloc", feature = "std"))]
#[test]
fn mutex_acquire_uncontested() {
+ use futures::future::FutureExt;
+ use futures::lock::Mutex;
+ use futures_test::task::panic_context;
+
let mutex = Mutex::new(());
for _ in 0..10 {
assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready());
}
}
+#[cfg(all(feature = "alloc", feature = "std"))]
#[test]
fn mutex_wakes_waiters() {
+ use futures::future::FutureExt;
+ use futures::lock::Mutex;
+ use futures::task::Context;
+ use futures_test::task::{new_count_waker, panic_context};
+
let mutex = Mutex::new(());
let (waker, counter) = new_count_waker();
let lock = mutex.lock().poll_unpin(&mut panic_context());
@@ -34,8 +35,18 @@ fn mutex_wakes_waiters() {
assert!(waiter.poll_unpin(&mut panic_context()).is_ready());
}
+#[cfg(feature = "thread-pool")]
#[test]
fn mutex_contested() {
+ use futures::channel::mpsc;
+ use futures::executor::block_on;
+ use futures::future::ready;
+ use futures::lock::Mutex;
+ use futures::stream::StreamExt;
+ use futures::task::SpawnExt;
+ use futures_test::future::FutureTestExt;
+ use std::sync::Arc;
+
let (tx, mut rx) = mpsc::unbounded();
let pool = futures::executor::ThreadPool::builder()
.pool_size(16)
diff --git a/tests/object_safety.rs b/tests/object_safety.rs
index 30c892f..b49ee88 100644
--- a/tests/object_safety.rs
+++ b/tests/object_safety.rs
@@ -28,6 +28,7 @@ fn sink() {
assert_is_object_safe::<&dyn Sink<(), Error = ()>>();
}
+#[cfg(feature = "std")] // futures::io
#[test]
fn io() {
// `AsyncReadExt`, `AsyncWriteExt`, `AsyncSeekExt` and `AsyncBufReadExt` are not object safe.
diff --git a/tests/oneshot.rs b/tests/oneshot.rs
index 58951ec..302160b 100644
--- a/tests/oneshot.rs
+++ b/tests/oneshot.rs
@@ -1,11 +1,12 @@
-use futures::channel::oneshot;
-use futures::future::{FutureExt, TryFutureExt};
-use futures_test::future::FutureTestExt;
-use std::sync::mpsc;
-use std::thread;
-
+#[cfg(feature = "alloc")] // channel
#[test]
fn oneshot_send1() {
+ use futures::channel::oneshot;
+ use futures::future::TryFutureExt;
+ use futures_test::future::FutureTestExt;
+ use std::sync::mpsc;
+ use std::thread;
+
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = mpsc::channel();
@@ -15,8 +16,15 @@ fn oneshot_send1() {
t.join().unwrap();
}
+#[cfg(feature = "alloc")] // channel
#[test]
fn oneshot_send2() {
+ use futures::channel::oneshot;
+ use futures::future::TryFutureExt;
+ use futures_test::future::FutureTestExt;
+ use std::sync::mpsc;
+ use std::thread;
+
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = mpsc::channel();
@@ -25,8 +33,15 @@ fn oneshot_send2() {
assert_eq!(1, rx2.recv().unwrap());
}
+#[cfg(feature = "alloc")] // channel
#[test]
fn oneshot_send3() {
+ use futures::channel::oneshot;
+ use futures::future::TryFutureExt;
+ use futures_test::future::FutureTestExt;
+ use std::sync::mpsc;
+ use std::thread;
+
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = mpsc::channel();
@@ -35,8 +50,14 @@ fn oneshot_send3() {
assert_eq!(1, rx2.recv().unwrap());
}
+#[cfg(feature = "alloc")] // channel
#[test]
fn oneshot_drop_tx1() {
+ use futures::channel::oneshot;
+ use futures::future::FutureExt;
+ use futures_test::future::FutureTestExt;
+ use std::sync::mpsc;
+
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = mpsc::channel();
@@ -46,8 +67,15 @@ fn oneshot_drop_tx1() {
assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap());
}
+#[cfg(feature = "alloc")] // channel
#[test]
fn oneshot_drop_tx2() {
+ use futures::channel::oneshot;
+ use futures::future::FutureExt;
+ use futures_test::future::FutureTestExt;
+ use std::sync::mpsc;
+ use std::thread;
+
let (tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = mpsc::channel();
@@ -58,8 +86,11 @@ fn oneshot_drop_tx2() {
assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap());
}
+#[cfg(feature = "alloc")] // channel
#[test]
fn oneshot_drop_rx() {
+ use futures::channel::oneshot;
+
let (tx, rx) = oneshot::channel::<i32>();
drop(rx);
assert_eq!(Err(2), tx.send(2));
diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs
index 15a0bef..be6ccc3 100644
--- a/tests/ready_queue.rs
+++ b/tests/ready_queue.rs
@@ -1,18 +1,20 @@
-use futures::channel::oneshot;
-use futures::executor::{block_on, block_on_stream};
-use futures::future;
-use futures::stream::{FuturesUnordered, StreamExt};
-use futures::task::Poll;
-use futures_test::task::noop_context;
-use std::panic::{self, AssertUnwindSafe};
-use std::sync::{Arc, Barrier};
-use std::thread;
-
-trait AssertSendSync: Send + Sync {}
-impl AssertSendSync for FuturesUnordered<()> {}
+#[cfg(feature = "alloc")] // FuturesUnordered
+mod assert_send_sync {
+ use futures::stream::FuturesUnordered;
+ pub trait AssertSendSync: Send + Sync {}
+ impl AssertSendSync for FuturesUnordered<()> {}
+}
+
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn basic_usage() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future;
+ use futures::stream::{FuturesUnordered, StreamExt};
+ use futures::task::Poll;
+
block_on(future::lazy(move |cx| {
let mut queue = FuturesUnordered::new();
let (tx1, rx1) = oneshot::channel();
@@ -39,8 +41,15 @@ fn basic_usage() {
}));
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn resolving_errors() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future;
+ use futures::stream::{FuturesUnordered, StreamExt};
+ use futures::task::Poll;
+
block_on(future::lazy(move |cx| {
let mut queue = FuturesUnordered::new();
let (tx1, rx1) = oneshot::channel();
@@ -67,8 +76,15 @@ fn resolving_errors() {
}));
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn dropping_ready_queue() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future;
+ use futures::stream::FuturesUnordered;
+ use futures_test::task::noop_context;
+
block_on(future::lazy(move |_| {
let queue = FuturesUnordered::new();
let (mut tx1, rx1) = oneshot::channel::<()>();
@@ -94,8 +110,15 @@ fn dropping_ready_queue() {
}));
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn stress() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on_stream;
+ use futures::stream::FuturesUnordered;
+ use std::sync::{Arc, Barrier};
+ use std::thread;
+
const ITER: usize = 300;
for i in 0..ITER {
@@ -137,8 +160,15 @@ fn stress() {
}
}
+#[cfg(feature = "executor")] // executor
#[test]
fn panicking_future_dropped() {
+ use futures::executor::block_on;
+ use futures::future;
+ use futures::stream::{FuturesUnordered, StreamExt};
+ use futures::task::Poll;
+ use std::panic::{self, AssertUnwindSafe};
+
block_on(future::lazy(move |cx| {
let mut queue = FuturesUnordered::new();
queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));
diff --git a/tests/recurse.rs b/tests/recurse.rs
index 2920a41..d3f4124 100644
--- a/tests/recurse.rs
+++ b/tests/recurse.rs
@@ -1,10 +1,11 @@
-use futures::executor::block_on;
-use futures::future::{self, FutureExt, BoxFuture};
-use std::sync::mpsc;
-use std::thread;
-
+#[cfg(feature = "executor")] // executor::
#[test]
fn lots() {
+ use futures::executor::block_on;
+ use futures::future::{self, FutureExt, BoxFuture};
+ use std::sync::mpsc;
+ use std::thread;
+
fn do_it(input: (i32, i32)) -> BoxFuture<'static, i32> {
let (n, x) = input;
if n == 0 {
diff --git a/tests/select_all.rs b/tests/select_all.rs
index aad977d..9a6d736 100644
--- a/tests/select_all.rs
+++ b/tests/select_all.rs
@@ -1,9 +1,10 @@
-use futures::executor::block_on;
-use futures::future::{ready, select_all};
-use std::collections::HashSet;
-
+#[cfg(feature = "executor")] // executor::
#[test]
fn smoke() {
+ use futures::executor::block_on;
+ use futures::future::{ready, select_all};
+ use std::collections::HashSet;
+
let v = vec![
ready(1),
ready(2),
diff --git a/tests/select_ok.rs b/tests/select_ok.rs
index db88a95..dd3703e 100644
--- a/tests/select_ok.rs
+++ b/tests/select_ok.rs
@@ -1,8 +1,9 @@
-use futures::executor::block_on;
-use futures::future::{err, ok, select_ok};
-
+#[cfg(feature = "executor")] // executor::
#[test]
fn ignore_err() {
+ use futures::executor::block_on;
+ use futures::future::{err, ok, select_ok};
+
let v = vec![
err(1),
err(2),
@@ -21,8 +22,12 @@ fn ignore_err() {
assert!(v.is_empty());
}
+#[cfg(feature = "executor")] // executor::
#[test]
fn last_err() {
+ use futures::executor::block_on;
+ use futures::future::{err, ok, select_ok};
+
let v = vec![
ok(1),
err(2),
diff --git a/tests/shared.rs b/tests/shared.rs
index 8402bfe..21e80fe 100644
--- a/tests/shared.rs
+++ b/tests/shared.rs
@@ -1,12 +1,23 @@
-use futures::channel::oneshot;
-use futures::executor::{block_on, LocalPool};
-use futures::future::{self, FutureExt, TryFutureExt, LocalFutureObj};
-use futures::task::LocalSpawn;
-use std::cell::{Cell, RefCell};
-use std::rc::Rc;
-use std::thread;
+mod count_clone {
+ use std::cell::Cell;
+ use std::rc::Rc;
+ pub struct CountClone(pub Rc<Cell<i32>>);
+
+ impl Clone for CountClone {
+ fn clone(&self) -> Self {
+ self.0.set(self.0.get() + 1);
+ CountClone(self.0.clone())
+ }
+ }
+}
+
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+ use std::thread;
let (tx, rx) = oneshot::channel::<i32>();
let f = rx.shared();
let join_handles = (0..threads_number)
@@ -26,23 +37,32 @@ fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) {
}
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn one_thread() {
send_shared_oneshot_and_wait_on_multiple_threads(1);
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn two_threads() {
send_shared_oneshot_and_wait_on_multiple_threads(2);
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn many_threads() {
send_shared_oneshot_and_wait_on_multiple_threads(1000);
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn drop_on_one_task_ok() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::{self, FutureExt, TryFutureExt};
+ use std::thread;
+
let (tx, rx) = oneshot::channel::<u32>();
let f1 = rx.shared();
let f2 = f1.clone();
@@ -69,15 +89,22 @@ fn drop_on_one_task_ok() {
t2.join().unwrap();
}
+#[cfg(feature = "executor")] // executor::
#[test]
fn drop_in_poll() {
+ use futures::executor::block_on;
+ use futures::future::{self, FutureExt, LocalFutureObj};
+ use std::cell::RefCell;
+ use std::rc::Rc;
+
let slot1 = Rc::new(RefCell::new(None));
let slot2 = slot1.clone();
let future1 = future::lazy(move |_| {
slot2.replace(None); // Drop future
1
- }).shared();
+ })
+ .shared();
let future2 = LocalFutureObj::new(Box::new(future1.clone()));
slot1.replace(Some(future2));
@@ -85,8 +112,14 @@ fn drop_in_poll() {
assert_eq!(block_on(future1), 1);
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn peek() {
+ use futures::channel::oneshot;
+ use futures::executor::LocalPool;
+ use futures::future::{FutureExt, LocalFutureObj};
+ use futures::task::LocalSpawn;
+
let mut local_pool = LocalPool::new();
let spawn = &mut local_pool.spawner();
@@ -108,24 +141,26 @@ fn peek() {
}
// Once the Shared has been polled, the value is peekable on the clone.
- spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap();
+ spawn
+ .spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ()))))
+ .unwrap();
local_pool.run();
for _ in 0..2 {
assert_eq!(*f2.peek().unwrap(), Ok(42));
}
}
-struct CountClone(Rc<Cell<i32>>);
-
-impl Clone for CountClone {
- fn clone(&self) -> Self {
- self.0.set(self.0.get() + 1);
- CountClone(self.0.clone())
- }
-}
-
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn dont_clone_in_single_owner_shared_future() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+ use std::cell::Cell;
+ use std::rc::Rc;
+
+ use count_clone::CountClone;
+
let counter = CountClone(Rc::new(Cell::new(0)));
let (tx, rx) = oneshot::channel();
@@ -136,8 +171,17 @@ fn dont_clone_in_single_owner_shared_future() {
assert_eq!(block_on(rx).unwrap().0.get(), 0);
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn dont_do_unnecessary_clones_on_output() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+ use std::cell::Cell;
+ use std::rc::Rc;
+
+ use count_clone::CountClone;
+
let counter = CountClone(Rc::new(Cell::new(0)));
let (tx, rx) = oneshot::channel();
@@ -149,3 +193,30 @@ fn dont_do_unnecessary_clones_on_output() {
assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2);
assert_eq!(block_on(rx).unwrap().0.get(), 2);
}
+
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
+#[test]
+fn shared_future_that_wakes_itself_until_pending_is_returned() {
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+ use std::cell::Cell;
+ use std::task::Poll;
+
+ let proceed = Cell::new(false);
+ let fut = futures::future::poll_fn(|cx| {
+ if proceed.get() {
+ Poll::Ready(())
+ } else {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ })
+ .shared();
+
+ // The join future can only complete if the second future gets a chance to run after the first
+ // has returned pending
+ assert_eq!(
+ block_on(futures::future::join(fut, async { proceed.set(true) })),
+ ((), ())
+ );
+}
diff --git a/tests/sink.rs b/tests/sink.rs
index f967e1b..f6ce28c 100644
--- a/tests/sink.rs
+++ b/tests/sink.rs
@@ -1,43 +1,258 @@
-use futures::channel::{mpsc, oneshot};
-use futures::executor::block_on;
-use futures::future::{self, Future, FutureExt, TryFutureExt};
-use futures::never::Never;
-use futures::ready;
-use futures::sink::{Sink, SinkErrInto, SinkExt};
-use futures::stream::{self, Stream, StreamExt};
-use futures::task::{self, ArcWake, Context, Poll, Waker};
-use futures_test::task::panic_context;
-use std::cell::{Cell, RefCell};
-use std::collections::VecDeque;
-use std::fmt;
-use std::mem;
-use std::pin::Pin;
-use std::rc::Rc;
-use std::sync::Arc;
-use std::sync::atomic::{AtomicBool, Ordering};
-
-fn sassert_next<S>(s: &mut S, item: S::Item)
-where
- S: Stream + Unpin,
- S::Item: Eq + fmt::Debug,
-{
- match s.poll_next_unpin(&mut panic_context()) {
- Poll::Ready(None) => panic!("stream is at its end"),
- Poll::Ready(Some(e)) => assert_eq!(e, item),
- Poll::Pending => panic!("stream wasn't ready"),
+#[allow(dead_code)]
+mod sassert_next {
+ use futures::stream::{Stream, StreamExt};
+ use futures::task::Poll;
+ use futures_test::task::panic_context;
+ use std::fmt;
+
+ pub fn sassert_next<S>(s: &mut S, item: S::Item)
+ where
+ S: Stream + Unpin,
+ S::Item: Eq + fmt::Debug,
+ {
+ match s.poll_next_unpin(&mut panic_context()) {
+ Poll::Ready(None) => panic!("stream is at its end"),
+ Poll::Ready(Some(e)) => assert_eq!(e, item),
+ Poll::Pending => panic!("stream wasn't ready"),
+ }
+ }
+}
+
+#[allow(dead_code)]
+mod unwrap {
+ use futures::task::Poll;
+ use std::fmt;
+
+ pub fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
+ match x {
+ Poll::Ready(Ok(x)) => x,
+ Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
+ Poll::Pending => panic!("Poll::Pending"),
+ }
+ }
+}
+
+#[allow(dead_code)]
+#[cfg(feature = "alloc")] // ArcWake
+mod flag_cx {
+ use futures::task::{self, ArcWake, Context};
+ use std::sync::Arc;
+ use std::sync::atomic::{AtomicBool, Ordering};
+
+ // An Unpark struct that records unpark events for inspection
+ pub struct Flag(AtomicBool);
+
+ impl Flag {
+ pub fn new() -> Arc<Self> {
+ Arc::new(Self(AtomicBool::new(false)))
+ }
+
+ pub fn take(&self) -> bool {
+ self.0.swap(false, Ordering::SeqCst)
+ }
+
+ pub fn set(&self, v: bool) {
+ self.0.store(v, Ordering::SeqCst)
+ }
+ }
+
+ impl ArcWake for Flag {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.set(true)
+ }
+ }
+
+ pub fn flag_cx<F, R>(f: F) -> R
+ where
+ F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
+ {
+ let flag = Flag::new();
+ let waker = task::waker_ref(&flag);
+ let cx = &mut Context::from_waker(&waker);
+ f(flag.clone(), cx)
+ }
+}
+
+#[allow(dead_code)]
+mod start_send_fut {
+ use futures::future::Future;
+ use futures::ready;
+ use futures::sink::Sink;
+ use futures::task::{Context, Poll};
+ use std::pin::Pin;
+
+ // Sends a value on an i32 channel sink
+ pub struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
+
+ impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
+ pub fn new(sink: S, item: Item) -> Self {
+ Self(Some(sink), Some(item))
+ }
+ }
+
+ impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
+ type Output = Result<S, S::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let Self(inner, item) = self.get_mut();
+ {
+ let mut inner = inner.as_mut().unwrap();
+ ready!(Pin::new(&mut inner).poll_ready(cx))?;
+ Pin::new(&mut inner).start_send(item.take().unwrap())?;
+ }
+ Poll::Ready(Ok(inner.take().unwrap()))
+ }
+ }
+}
+
+#[allow(dead_code)]
+mod manual_flush {
+ use futures::sink::Sink;
+ use futures::task::{Context, Poll, Waker};
+ use std::mem;
+ use std::pin::Pin;
+
+ // Immediately accepts all requests to start pushing, but completion is managed
+ // by manually flushing
+ pub struct ManualFlush<T: Unpin> {
+ data: Vec<T>,
+ waiting_tasks: Vec<Waker>,
+ }
+
+ impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
+ type Error = ();
+
+ fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
+ if let Some(item) = item {
+ self.data.push(item);
+ } else {
+ self.force_flush();
+ }
+ Ok(())
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.data.is_empty() {
+ Poll::Ready(Ok(()))
+ } else {
+ self.waiting_tasks.push(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_flush(cx)
+ }
+ }
+
+ impl<T: Unpin> ManualFlush<T> {
+ pub fn new() -> Self {
+ Self {
+ data: Vec::new(),
+ waiting_tasks: Vec::new(),
+ }
+ }
+
+ pub fn force_flush(&mut self) -> Vec<T> {
+ for task in self.waiting_tasks.drain(..) {
+ task.wake()
+ }
+ mem::replace(&mut self.data, Vec::new())
+ }
}
}
-fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
- match x {
- Poll::Ready(Ok(x)) => x,
- Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
- Poll::Pending => panic!("Poll::Pending"),
+#[allow(dead_code)]
+mod allowance {
+ use futures::sink::Sink;
+ use futures::task::{Context, Poll, Waker};
+ use std::cell::{Cell, RefCell};
+ use std::pin::Pin;
+ use std::rc::Rc;
+
+ pub struct ManualAllow<T: Unpin> {
+ pub data: Vec<T>,
+ allow: Rc<Allow>,
+ }
+
+ pub struct Allow {
+ flag: Cell<bool>,
+ tasks: RefCell<Vec<Waker>>,
+ }
+
+ impl Allow {
+ pub fn new() -> Self {
+ Self {
+ flag: Cell::new(false),
+ tasks: RefCell::new(Vec::new()),
+ }
+ }
+
+ pub fn check(&self, cx: &mut Context<'_>) -> bool {
+ if self.flag.get() {
+ true
+ } else {
+ self.tasks.borrow_mut().push(cx.waker().clone());
+ false
+ }
+ }
+
+ pub fn start(&self) {
+ self.flag.set(true);
+ let mut tasks = self.tasks.borrow_mut();
+ for task in tasks.drain(..) {
+ task.wake();
+ }
+ }
+ }
+
+ impl<T: Unpin> Sink<T> for ManualAllow<T> {
+ type Error = ();
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.allow.check(cx) {
+ Poll::Ready(Ok(()))
+ } else {
+ Poll::Pending
+ }
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
+ self.data.push(item);
+ Ok(())
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ pub fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
+ let allow = Rc::new(Allow::new());
+ let manual_allow = ManualAllow {
+ data: Vec::new(),
+ allow: allow.clone(),
+ };
+ (manual_allow, allow)
}
}
+
+#[cfg(feature = "alloc")] // futures_sink::Sink satisfying for .left/right_sink
#[test]
fn either_sink() {
+ use futures::sink::{Sink, SinkExt};
+ use std::collections::VecDeque;
+ use std::pin::Pin;
+
let mut s = if true {
Vec::<i32>::new().left_sink()
} else {
@@ -47,8 +262,13 @@ fn either_sink() {
Pin::new(&mut s).start_send(0).unwrap();
}
+#[cfg(feature = "executor")] // executor::
#[test]
fn vec_sink() {
+ use futures::executor::block_on;
+ use futures::sink::{Sink, SinkExt};
+ use std::pin::Pin;
+
let mut v = Vec::new();
Pin::new(&mut v).start_send(0).unwrap();
Pin::new(&mut v).start_send(1).unwrap();
@@ -57,8 +277,13 @@ fn vec_sink() {
assert_eq!(v, vec![0, 1]);
}
+#[cfg(feature = "alloc")] // futures_sink::Sink satisfying for .start_send()
#[test]
fn vecdeque_sink() {
+ use futures::sink::Sink;
+ use std::collections::VecDeque;
+ use std::pin::Pin;
+
let mut deque = VecDeque::new();
Pin::new(&mut deque).start_send(2).unwrap();
Pin::new(&mut deque).start_send(3).unwrap();
@@ -68,8 +293,12 @@ fn vecdeque_sink() {
assert_eq!(deque.pop_front(), None);
}
+#[cfg(feature = "executor")] // executor::
#[test]
fn send() {
+ use futures::executor::block_on;
+ use futures::sink::SinkExt;
+
let mut v = Vec::new();
block_on(v.send(0)).unwrap();
@@ -82,8 +311,13 @@ fn send() {
assert_eq!(v, vec![0, 1, 2]);
}
+#[cfg(feature = "executor")] // executor::
#[test]
fn send_all() {
+ use futures::executor::block_on;
+ use futures::sink::SinkExt;
+ use futures::stream::{self, StreamExt};
+
let mut v = Vec::new();
block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap();
@@ -96,66 +330,20 @@ fn send_all() {
assert_eq!(v, vec![0, 1, 2, 3, 4, 5]);
}
-// An Unpark struct that records unpark events for inspection
-struct Flag(AtomicBool);
-
-impl Flag {
- fn new() -> Arc<Self> {
- Arc::new(Self(AtomicBool::new(false)))
- }
-
- fn take(&self) -> bool {
- self.0.swap(false, Ordering::SeqCst)
- }
-
- fn set(&self, v: bool) {
- self.0.store(v, Ordering::SeqCst)
- }
-}
-
-impl ArcWake for Flag {
- fn wake_by_ref(arc_self: &Arc<Self>) {
- arc_self.set(true)
- }
-}
-
-fn flag_cx<F, R>(f: F) -> R
-where
- F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
-{
- let flag = Flag::new();
- let waker = task::waker_ref(&flag);
- let cx = &mut Context::from_waker(&waker);
- f(flag.clone(), cx)
-}
-
-// Sends a value on an i32 channel sink
-struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
-
-impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
- fn new(sink: S, item: Item) -> Self {
- Self(Some(sink), Some(item))
- }
-}
-
-impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
- type Output = Result<S, S::Error>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let Self(inner, item) = self.get_mut();
- {
- let mut inner = inner.as_mut().unwrap();
- ready!(Pin::new(&mut inner).poll_ready(cx))?;
- Pin::new(&mut inner).start_send(item.take().unwrap())?;
- }
- Poll::Ready(Ok(inner.take().unwrap()))
- }
-}
-
// Test that `start_send` on an `mpsc` channel does indeed block when the
// channel is full
+#[cfg(feature = "executor")] // executor::
#[test]
fn mpsc_blocking_start_send() {
+ use futures::channel::mpsc;
+ use futures::executor::block_on;
+ use futures::future::{self, FutureExt};
+
+ use start_send_fut::StartSendFut;
+ use flag_cx::flag_cx;
+ use sassert_next::sassert_next;
+ use unwrap::unwrap;
+
let (mut tx, mut rx) = mpsc::channel::<i32>(0);
block_on(future::lazy(|_| {
@@ -177,8 +365,20 @@ fn mpsc_blocking_start_send() {
// test `flush` by using `with` to make the first insertion into a sink block
// until a oneshot is completed
+#[cfg(feature = "executor")] // executor::
#[test]
fn with_flush() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::{self, FutureExt, TryFutureExt};
+ use futures::never::Never;
+ use futures::sink::{Sink, SinkExt};
+ use std::mem;
+ use std::pin::Pin;
+
+ use flag_cx::flag_cx;
+ use unwrap::unwrap;
+
let (tx, rx) = oneshot::channel();
let mut block = rx.boxed();
let mut sink = Vec::new().with(|elem| {
@@ -203,8 +403,14 @@ fn with_flush() {
}
// test simple use of with to change data
+#[cfg(feature = "executor")] // executor::
#[test]
fn with_as_map() {
+ use futures::executor::block_on;
+ use futures::future;
+ use futures::never::Never;
+ use futures::sink::SinkExt;
+
let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2));
block_on(sink.send(0)).unwrap();
block_on(sink.send(1)).unwrap();
@@ -213,8 +419,13 @@ fn with_as_map() {
}
// test simple use of with_flat_map
+#[cfg(feature = "executor")] // executor::
#[test]
fn with_flat_map() {
+ use futures::executor::block_on;
+ use futures::sink::SinkExt;
+ use futures::stream::{self, StreamExt};
+
let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok));
block_on(sink.send(0)).unwrap();
block_on(sink.send(1)).unwrap();
@@ -225,8 +436,19 @@ fn with_flat_map() {
// Check that `with` propagates `poll_ready` to the inner sink.
// Regression test for the issue #1834.
+#[cfg(feature = "executor")] // executor::
#[test]
fn with_propagates_poll_ready() {
+ use futures::channel::mpsc;
+ use futures::executor::block_on;
+ use futures::future;
+ use futures::sink::{Sink, SinkExt};
+ use futures::task::Poll;
+ use std::pin::Pin;
+
+ use flag_cx::flag_cx;
+ use sassert_next::sassert_next;
+
let (tx, mut rx) = mpsc::channel::<i32>(0);
let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10));
@@ -249,63 +471,19 @@ fn with_propagates_poll_ready() {
}));
}
-// Immediately accepts all requests to start pushing, but completion is managed
-// by manually flushing
-struct ManualFlush<T: Unpin> {
- data: Vec<T>,
- waiting_tasks: Vec<Waker>,
-}
-
-impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
- type Error = ();
-
- fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
- if let Some(item) = item {
- self.data.push(item);
- } else {
- self.force_flush();
- }
- Ok(())
- }
-
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- if self.data.is_empty() {
- Poll::Ready(Ok(()))
- } else {
- self.waiting_tasks.push(cx.waker().clone());
- Poll::Pending
- }
- }
-
- fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.poll_flush(cx)
- }
-}
-
-impl<T: Unpin> ManualFlush<T> {
- fn new() -> Self {
- Self {
- data: Vec::new(),
- waiting_tasks: Vec::new(),
- }
- }
-
- fn force_flush(&mut self) -> Vec<T> {
- for task in self.waiting_tasks.drain(..) {
- task.wake()
- }
- mem::replace(&mut self.data, Vec::new())
- }
-}
-
// test that the `with` sink doesn't require the underlying sink to flush,
// but doesn't claim to be flushed until the underlying sink is
+#[cfg(feature = "alloc")] // flag_cx
#[test]
fn with_flush_propagate() {
+ use futures::future::{self, FutureExt};
+ use futures::sink::{Sink, SinkExt};
+ use std::pin::Pin;
+
+ use manual_flush::ManualFlush;
+ use flag_cx::flag_cx;
+ use unwrap::unwrap;
+
let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>);
flag_cx(|flag, cx| {
unwrap(Pin::new(&mut sink).poll_ready(cx));
@@ -325,8 +503,12 @@ fn with_flush_propagate() {
}
// test that a buffer is a no-nop around a sink that always accepts sends
+#[cfg(feature = "executor")] // executor::
#[test]
fn buffer_noop() {
+ use futures::executor::block_on;
+ use futures::sink::SinkExt;
+
let mut sink = Vec::new().buffer(0);
block_on(sink.send(0)).unwrap();
block_on(sink.send(1)).unwrap();
@@ -338,80 +520,21 @@ fn buffer_noop() {
assert_eq!(sink.get_ref(), &[0, 1]);
}
-struct ManualAllow<T: Unpin> {
- data: Vec<T>,
- allow: Rc<Allow>,
-}
-
-struct Allow {
- flag: Cell<bool>,
- tasks: RefCell<Vec<Waker>>,
-}
-
-impl Allow {
- fn new() -> Self {
- Self {
- flag: Cell::new(false),
- tasks: RefCell::new(Vec::new()),
- }
- }
-
- fn check(&self, cx: &mut Context<'_>) -> bool {
- if self.flag.get() {
- true
- } else {
- self.tasks.borrow_mut().push(cx.waker().clone());
- false
- }
- }
-
- fn start(&self) {
- self.flag.set(true);
- let mut tasks = self.tasks.borrow_mut();
- for task in tasks.drain(..) {
- task.wake();
- }
- }
-}
-
-impl<T: Unpin> Sink<T> for ManualAllow<T> {
- type Error = ();
-
- fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- if self.allow.check(cx) {
- Poll::Ready(Ok(()))
- } else {
- Poll::Pending
- }
- }
-
- fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
- self.data.push(item);
- Ok(())
- }
-
- fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
-fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
- let allow = Rc::new(Allow::new());
- let manual_allow = ManualAllow {
- data: Vec::new(),
- allow: allow.clone(),
- };
- (manual_allow, allow)
-}
-
// test basic buffer functionality, including both filling up to capacity,
// and writing out when the underlying sink is ready
+#[cfg(feature = "executor")] // executor::
+#[cfg(feature = "alloc")] // flag_cx
#[test]
fn buffer() {
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+ use futures::sink::SinkExt;
+
+ use start_send_fut::StartSendFut;
+ use flag_cx::flag_cx;
+ use unwrap::unwrap;
+ use allowance::manual_allow;
+
let (sink, allow) = manual_allow::<i32>();
let sink = sink.buffer(2);
@@ -429,8 +552,13 @@ fn buffer() {
})
}
+#[cfg(feature = "executor")] // executor::
#[test]
fn fanout_smoke() {
+ use futures::executor::block_on;
+ use futures::sink::SinkExt;
+ use futures::stream::{self, StreamExt};
+
let sink1 = Vec::new();
let sink2 = Vec::new();
let mut sink = sink1.fanout(sink2);
@@ -440,8 +568,20 @@ fn fanout_smoke() {
assert_eq!(sink2, vec![1, 2, 3]);
}
+#[cfg(feature = "executor")] // executor::
+#[cfg(feature = "alloc")] // flag_cx
#[test]
fn fanout_backpressure() {
+ use futures::channel::mpsc;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+ use futures::sink::SinkExt;
+ use futures::stream::StreamExt;
+
+ use start_send_fut::StartSendFut;
+ use flag_cx::flag_cx;
+ use unwrap::unwrap;
+
let (left_send, mut left_recv) = mpsc::channel(0);
let (right_send, mut right_recv) = mpsc::channel(0);
let sink = left_send.fanout(right_send);
@@ -472,8 +612,15 @@ fn fanout_backpressure() {
})
}
+#[cfg(all(feature = "alloc", feature = "std"))] // channel::mpsc
#[test]
fn sink_map_err() {
+ use futures::channel::mpsc;
+ use futures::sink::{Sink, SinkExt};
+ use futures::task::Poll;
+ use futures_test::task::panic_context;
+ use std::pin::Pin;
+
{
let cx = &mut panic_context();
let (tx, _rx) = mpsc::channel(1);
@@ -489,17 +636,24 @@ fn sink_map_err() {
);
}
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
-struct ErrIntoTest;
-
-impl From<mpsc::SendError> for ErrIntoTest {
- fn from(_: mpsc::SendError) -> Self {
- Self
- }
-}
-
+#[cfg(all(feature = "alloc", feature = "std"))] // channel::mpsc
#[test]
fn err_into() {
+ use futures::channel::mpsc;
+ use futures::sink::{Sink, SinkErrInto, SinkExt};
+ use futures::task::Poll;
+ use futures_test::task::panic_context;
+ use std::pin::Pin;
+
+ #[derive(Copy, Clone, Debug, PartialEq, Eq)]
+ pub struct ErrIntoTest;
+
+ impl From<mpsc::SendError> for ErrIntoTest {
+ fn from(_: mpsc::SendError) -> Self {
+ Self
+ }
+ }
+
{
let cx = &mut panic_context();
let (tx, _rx) = mpsc::channel(1);
diff --git a/tests/sink_fanout.rs b/tests/sink_fanout.rs
index e57b2d8..62f32f2 100644
--- a/tests/sink_fanout.rs
+++ b/tests/sink_fanout.rs
@@ -1,11 +1,12 @@
-use futures::channel::mpsc;
-use futures::executor::block_on;
-use futures::future::join3;
-use futures::sink::SinkExt;
-use futures::stream::{self, StreamExt};
-
+#[cfg(all(feature = "alloc", feature="std", feature="executor"))] // channel::mpsc, executor::
#[test]
fn it_works() {
+ use futures::channel::mpsc;
+ use futures::executor::block_on;
+ use futures::future::join3;
+ use futures::sink::SinkExt;
+ use futures::stream::{self, StreamExt};
+
let (tx1, rx1) = mpsc::channel(1);
let (tx2, rx2) = mpsc::channel(2);
let tx = tx1.fanout(tx2).sink_map_err(|_| ());
diff --git a/tests/split.rs b/tests/split.rs
index 9f4f1a0..140cf3c 100644
--- a/tests/split.rs
+++ b/tests/split.rs
@@ -1,65 +1,66 @@
-use futures::executor::block_on;
-use futures::sink::{Sink, SinkExt};
-use futures::stream::{self, Stream, StreamExt};
-use futures::task::{Context, Poll};
-use pin_utils::unsafe_pinned;
-use std::pin::Pin;
+#[cfg(feature = "executor")] // executor::
+#[test]
+fn test_split() {
+ use futures::executor::block_on;
+ use futures::sink::{Sink, SinkExt};
+ use futures::stream::{self, Stream, StreamExt};
+ use futures::task::{Context, Poll};
+ use pin_utils::unsafe_pinned;
+ use std::pin::Pin;
-struct Join<T, U> {
- stream: T,
- sink: U
-}
+ struct Join<T, U> {
+ stream: T,
+ sink: U
+ }
-impl<T, U> Join<T, U> {
- unsafe_pinned!(stream: T);
- unsafe_pinned!(sink: U);
-}
+ impl<T, U> Join<T, U> {
+ unsafe_pinned!(stream: T);
+ unsafe_pinned!(sink: U);
+ }
-impl<T: Stream, U> Stream for Join<T, U> {
- type Item = T::Item;
+ impl<T: Stream, U> Stream for Join<T, U> {
+ type Item = T::Item;
- fn poll_next(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<T::Item>> {
- self.stream().poll_next(cx)
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<T::Item>> {
+ self.stream().poll_next(cx)
+ }
}
-}
-impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> {
- type Error = U::Error;
+ impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> {
+ type Error = U::Error;
- fn poll_ready(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
- self.sink().poll_ready(cx)
- }
+ fn poll_ready(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ self.sink().poll_ready(cx)
+ }
- fn start_send(
- self: Pin<&mut Self>,
- item: Item,
- ) -> Result<(), Self::Error> {
- self.sink().start_send(item)
- }
+ fn start_send(
+ self: Pin<&mut Self>,
+ item: Item,
+ ) -> Result<(), Self::Error> {
+ self.sink().start_send(item)
+ }
- fn poll_flush(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
- self.sink().poll_flush(cx)
- }
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ self.sink().poll_flush(cx)
+ }
- fn poll_close(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
- self.sink().poll_close(cx)
+ fn poll_close(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ self.sink().poll_close(cx)
+ }
}
-}
-#[test]
-fn test_split() {
let mut dest: Vec<i32> = Vec::new();
{
let join = Join {
diff --git a/tests/stream.rs b/tests/stream.rs
index fd6a8b6..09fe9e2 100644
--- a/tests/stream.rs
+++ b/tests/stream.rs
@@ -1,8 +1,9 @@
-use futures::executor::block_on;
-use futures::stream::{self, StreamExt};
-
+#[cfg(feature = "executor")] // executor::
#[test]
fn select() {
+ use futures::executor::block_on;
+ use futures::stream::{self, StreamExt};
+
fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) {
let a = stream::iter(a);
let b = stream::iter(b);
@@ -15,14 +16,38 @@ fn select() {
select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]);
}
+#[cfg(feature = "executor")] // executor::
+#[test]
+fn flat_map() {
+ use futures::stream::{self, StreamExt};
+
+ futures::executor::block_on(async {
+ let st = stream::iter(vec![
+ stream::iter(0..=4u8),
+ stream::iter(6..=10),
+ stream::iter(0..=2),
+ ]);
+
+ let values: Vec<_> = st
+ .flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0)))
+ .collect()
+ .await;
+
+ assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]);
+ });
+}
+
+#[cfg(feature = "executor")] // executor::
#[test]
fn scan() {
+ use futures::stream::{self, StreamExt};
+
futures::executor::block_on(async {
assert_eq!(
stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
- .scan(1, |acc, e| {
- *acc += 1;
- futures::future::ready(if e < *acc { Some(e) } else { None })
+ .scan(1, |state, e| {
+ *state += 1;
+ futures::future::ready(if e < *state { Some(e) } else { None })
})
.collect::<Vec<_>>()
.await,
@@ -30,3 +55,111 @@ fn scan() {
);
});
}
+
+#[cfg(feature = "executor")] // executor::
+#[test]
+fn take_until() {
+ use futures::future::{self, Future};
+ use futures::stream::{self, StreamExt};
+ use futures::task::Poll;
+
+ fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
+ let mut i = 0;
+ future::poll_fn(move |_cx| {
+ i += 1;
+ if i <= stop_on {
+ Poll::Pending
+ } else {
+ Poll::Ready(())
+ }
+ })
+ }
+
+ futures::executor::block_on(async {
+ // Verify stopping works:
+ let stream = stream::iter(1u32..=10);
+ let stop_fut = make_stop_fut(5);
+
+ let stream = stream.take_until(stop_fut);
+ let last = stream.fold(0, |_, i| async move { i }).await;
+ assert_eq!(last, 5);
+
+ // Verify take_future() works:
+ let stream = stream::iter(1..=10);
+ let stop_fut = make_stop_fut(5);
+
+ let mut stream = stream.take_until(stop_fut);
+
+ assert_eq!(stream.next().await, Some(1));
+ assert_eq!(stream.next().await, Some(2));
+
+ stream.take_future();
+
+ let last = stream.fold(0, |_, i| async move { i }).await;
+ assert_eq!(last, 10);
+
+ // Verify take_future() returns None if stream is stopped:
+ let stream = stream::iter(1u32..=10);
+ let stop_fut = make_stop_fut(1);
+ let mut stream = stream.take_until(stop_fut);
+ assert_eq!(stream.next().await, Some(1));
+ assert_eq!(stream.next().await, None);
+ assert!(stream.take_future().is_none());
+
+ // Verify TakeUntil is fused:
+ let mut i = 0;
+ let stream = stream::poll_fn(move |_cx| {
+ i += 1;
+ match i {
+ 1 => Poll::Ready(Some(1)),
+ 2 => Poll::Ready(None),
+ _ => panic!("TakeUntil not fused"),
+ }
+ });
+
+ let stop_fut = make_stop_fut(1);
+ let mut stream = stream.take_until(stop_fut);
+ assert_eq!(stream.next().await, Some(1));
+ assert_eq!(stream.next().await, None);
+ assert_eq!(stream.next().await, None);
+ });
+}
+
+#[test]
+#[should_panic]
+fn ready_chunks_panic_on_cap_zero() {
+ use futures::channel::mpsc;
+ use futures::stream::StreamExt;
+
+ let (_, rx1) = mpsc::channel::<()>(1);
+
+ let _ = rx1.ready_chunks(0);
+}
+
+#[cfg(feature = "executor")] // executor::
+#[test]
+fn ready_chunks() {
+ use futures::channel::mpsc;
+ use futures::stream::StreamExt;
+ use futures::sink::SinkExt;
+ use futures::FutureExt;
+ use futures_test::task::noop_context;
+
+ let (mut tx, rx1) = mpsc::channel::<i32>(16);
+
+ let mut s = rx1.ready_chunks(2);
+
+ let mut cx = noop_context();
+ assert!(s.next().poll_unpin(&mut cx).is_pending());
+
+ futures::executor::block_on(async {
+ tx.send(1).await.unwrap();
+
+ assert_eq!(s.next().await.unwrap(), vec![1]);
+ tx.send(2).await.unwrap();
+ tx.send(3).await.unwrap();
+ tx.send(4).await.unwrap();
+ assert_eq!(s.next().await.unwrap(), vec![2,3]);
+ assert_eq!(s.next().await.unwrap(), vec![4]);
+ });
+} \ No newline at end of file
diff --git a/tests/stream_catch_unwind.rs b/tests/stream_catch_unwind.rs
index 8b23a0a..94c7a75 100644
--- a/tests/stream_catch_unwind.rs
+++ b/tests/stream_catch_unwind.rs
@@ -1,8 +1,9 @@
-use futures::executor::block_on_stream;
-use futures::stream::{self, StreamExt};
-
+#[cfg(feature = "executor")]
#[test]
fn panic_in_the_middle_of_the_stream() {
+ use futures::executor::block_on_stream;
+ use futures::stream::{self, StreamExt};
+
let stream = stream::iter(vec![Some(10), None, Some(11)]);
// panic on second element
@@ -14,8 +15,12 @@ fn panic_in_the_middle_of_the_stream() {
assert!(iter.next().is_none());
}
+#[cfg(feature = "executor")]
#[test]
fn no_panic() {
+ use futures::executor::block_on_stream;
+ use futures::stream::{self, StreamExt};
+
let stream = stream::iter(vec![10, 11, 12]);
let mut iter = block_on_stream(stream.catch_unwind());
diff --git a/tests/stream_into_async_read.rs b/tests/stream_into_async_read.rs
index c528af0..1b26233 100644
--- a/tests/stream_into_async_read.rs
+++ b/tests/stream_into_async_read.rs
@@ -1,51 +1,32 @@
-use core::pin::Pin;
-use futures::io::{AsyncRead, AsyncBufRead};
-use futures::stream::{self, TryStreamExt};
-use futures::task::Poll;
-use futures_test::{task::noop_context, stream::StreamTestExt};
-
-macro_rules! assert_read {
- ($reader:expr, $buf:expr, $item:expr) => {
- let mut cx = noop_context();
- loop {
- match Pin::new(&mut $reader).poll_read(&mut cx, $buf) {
- Poll::Ready(Ok(x)) => {
- assert_eq!(x, $item);
- break;
- }
- Poll::Ready(Err(err)) => {
- panic!("assertion failed: expected value but got {}", err);
- }
- Poll::Pending => {
- continue;
- }
- }
- }
- };
-}
-
-macro_rules! assert_fill_buf {
- ($reader:expr, $buf:expr) => {
- let mut cx = noop_context();
- loop {
- match Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
- Poll::Ready(Ok(x)) => {
- assert_eq!(x, $buf);
- break;
- }
- Poll::Ready(Err(err)) => {
- panic!("assertion failed: expected value but got {}", err);
- }
- Poll::Pending => {
- continue;
+#[cfg(feature = "std")] // io::
+#[test]
+fn test_into_async_read() {
+ use core::pin::Pin;
+ use futures::io::AsyncRead;
+ use futures::stream::{self, TryStreamExt};
+ use futures::task::Poll;
+ use futures_test::{task::noop_context, stream::StreamTestExt};
+
+ macro_rules! assert_read {
+ ($reader:expr, $buf:expr, $item:expr) => {
+ let mut cx = noop_context();
+ loop {
+ match Pin::new(&mut $reader).poll_read(&mut cx, $buf) {
+ Poll::Ready(Ok(x)) => {
+ assert_eq!(x, $item);
+ break;
+ }
+ Poll::Ready(Err(err)) => {
+ panic!("assertion failed: expected value but got {}", err);
+ }
+ Poll::Pending => {
+ continue;
+ }
}
}
- }
- };
-}
+ };
+ }
-#[test]
-fn test_into_async_read() {
let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
let mut reader = stream.interleave_pending().into_async_read();
let mut buf = vec![0; 3];
@@ -71,8 +52,35 @@ fn test_into_async_read() {
assert_read!(reader, &mut buf, 0);
}
+#[cfg(feature = "std")] // io::
#[test]
fn test_into_async_bufread() -> std::io::Result<()> {
+ use core::pin::Pin;
+ use futures::io::AsyncBufRead;
+ use futures::stream::{self, TryStreamExt};
+ use futures::task::Poll;
+ use futures_test::{task::noop_context, stream::StreamTestExt};
+
+ macro_rules! assert_fill_buf {
+ ($reader:expr, $buf:expr) => {
+ let mut cx = noop_context();
+ loop {
+ match Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
+ Poll::Ready(Ok(x)) => {
+ assert_eq!(x, $buf);
+ break;
+ }
+ Poll::Ready(Err(err)) => {
+ panic!("assertion failed: expected value but got {}", err);
+ }
+ Poll::Pending => {
+ continue;
+ }
+ }
+ }
+ };
+ }
+
let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
let mut reader = stream.interleave_pending().into_async_read();
diff --git a/tests/stream_peekable.rs b/tests/stream_peekable.rs
index b65a057..c49cd72 100644
--- a/tests/stream_peekable.rs
+++ b/tests/stream_peekable.rs
@@ -1,9 +1,10 @@
-use futures::executor::block_on;
-use futures::pin_mut;
-use futures::stream::{self, Peekable, StreamExt};
-
+#[cfg(feature = "executor")] // executor::
#[test]
fn peekable() {
+ use futures::executor::block_on;
+ use futures::pin_mut;
+ use futures::stream::{self, Peekable, StreamExt};
+
block_on(async {
let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable();
pin_mut!(peekable);
diff --git a/tests/stream_select_all.rs b/tests/stream_select_all.rs
index eb711dd..411cb73 100644
--- a/tests/stream_select_all.rs
+++ b/tests/stream_select_all.rs
@@ -1,12 +1,11 @@
-use futures::channel::mpsc;
-use futures::executor::block_on_stream;
-use futures::future::{self, FutureExt};
-use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt};
-use futures::task::Poll;
-use futures_test::task::noop_context;
-
+#[cfg(feature = "alloc")] // stream::SelectAll
#[test]
fn is_terminated() {
+ use futures::future::{self, FutureExt};
+ use futures::stream::{FusedStream, SelectAll, StreamExt};
+ use futures::task::Poll;
+ use futures_test::task::noop_context;
+
let mut cx = noop_context();
let mut tasks = SelectAll::new();
@@ -30,8 +29,12 @@ fn is_terminated() {
assert_eq!(tasks.is_terminated(), true);
}
+#[cfg(feature = "executor")] // executor::
#[test]
fn issue_1626() {
+ use futures::executor::block_on_stream;
+ use futures::stream;
+
let a = stream::iter(0..=2);
let b = stream::iter(10..=14);
@@ -48,8 +51,14 @@ fn issue_1626() {
assert_eq!(s.next(), None);
}
+#[cfg(all(feature = "alloc", feature = "std"))] // channel::mpsc
+#[cfg(feature = "executor")] // executor::
#[test]
fn works_1() {
+ use futures::channel::mpsc;
+ use futures::executor::block_on_stream;
+ use futures::stream::select_all;
+
let (a_tx, a_rx) = mpsc::unbounded::<u32>();
let (b_tx, b_rx) = mpsc::unbounded::<u32>();
let (c_tx, c_rx) = mpsc::unbounded::<u32>();
diff --git a/tests/stream_select_next_some.rs b/tests/stream_select_next_some.rs
index 09d7e89..f2b3af2 100644
--- a/tests/stream_select_next_some.rs
+++ b/tests/stream_select_next_some.rs
@@ -1,12 +1,12 @@
-use futures::{future, select};
-use futures::future::{FusedFuture, FutureExt};
-use futures::stream::{FuturesUnordered, StreamExt};
-use futures::task::{Context, Poll};
-use futures_test::future::FutureTestExt;
-use futures_test::task::new_count_waker;
-
+#[cfg(feature = "alloc")] // stream::FuturesUnordered
#[test]
fn is_terminated() {
+ use futures::future;
+ use futures::future::{FusedFuture, FutureExt};
+ use futures::stream::{FuturesUnordered, StreamExt};
+ use futures::task::{Context, Poll};
+ use futures_test::task::new_count_waker;
+
let (waker, counter) = new_count_waker();
let mut cx = Context::from_waker(&waker);
@@ -29,8 +29,14 @@ fn is_terminated() {
assert_eq!(select_next_some.is_terminated(), true);
}
+#[cfg(all(feature = "async-await", feature = "std"))] // futures::select
+#[cfg(feature = "executor")] // executor::
#[test]
fn select() {
+ use futures::{future, select};
+ use futures::stream::{FuturesUnordered, StreamExt};
+ use futures_test::future::FutureTestExt;
+
// Checks that even though `async_tasks` will yield a `None` and return
// `is_terminated() == true` during the first poll, it manages to toggle
// back to having items after a future is pushed into it during the second
@@ -56,8 +62,15 @@ fn select() {
}
// Check that `select!` macro does not fail when importing from `futures_util`.
+#[cfg(feature = "alloc")] // stream::FuturesUnordered
+#[cfg(feature = "async-await")] // futures_util::select turned on
+#[cfg(feature = "executor")] // executor::
#[test]
fn futures_util_select() {
+ use futures::future;
+ use futures::stream::{FuturesUnordered, StreamExt};
+ use futures_test::future::FutureTestExt;
+
use futures_util::select;
// Checks that even though `async_tasks` will yield a `None` and return
diff --git a/tests/try_join.rs b/tests/try_join.rs
index 6c6d084..0861c1e 100644
--- a/tests/try_join.rs
+++ b/tests/try_join.rs
@@ -1,3 +1,5 @@
+#![cfg(feature = "executor")] // executor::
+#![cfg(feature = "async-await")] // try_join!
#![deny(unreachable_code)]
use futures::{try_join, executor::block_on};
diff --git a/tests/try_join_all.rs b/tests/try_join_all.rs
index 662b866..1097a36 100644
--- a/tests/try_join_all.rs
+++ b/tests/try_join_all.rs
@@ -1,19 +1,26 @@
-use futures_util::future::*;
-use std::future::Future;
-use futures::executor::block_on;
-use std::fmt::Debug;
-
-fn assert_done<T, F>(actual_fut: F, expected: T)
-where
- T: PartialEq + Debug,
- F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
-{
- let output = block_on(actual_fut());
- assert_eq!(output, expected);
+#[cfg(feature = "executor")] // executor::
+mod util {
+ use std::future::Future;
+ use futures::executor::block_on;
+ use std::fmt::Debug;
+
+ pub fn assert_done<T, F>(actual_fut: F, expected: T)
+ where
+ T: PartialEq + Debug,
+ F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
+ {
+ let output = block_on(actual_fut());
+ assert_eq!(output, expected);
+ }
}
+#[cfg(feature = "executor")] // assert_done
#[test]
fn collect_collects() {
+ use futures_util::future::{err, ok, try_join_all};
+
+ use util::assert_done;
+
assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2]));
assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2));
assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1]));
@@ -23,8 +30,14 @@ fn collect_collects() {
// TODO: needs more tests
}
+#[cfg(feature = "executor")] // assert_done
#[test]
fn try_join_all_iter_lifetime() {
+ use futures_util::future::{ok, try_join_all};
+ use std::future::Future;
+
+ use util::assert_done;
+
// In futures-rs version 0.1, this function would fail to typecheck due to an overly
// conservative type parameterization of `TryJoinAll`.
fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> {
@@ -35,8 +48,13 @@ fn try_join_all_iter_lifetime() {
assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3 as usize, 0, 1]));
}
+#[cfg(feature = "executor")] // assert_done
#[test]
fn try_join_all_from_iter() {
+ use futures_util::future::{ok, TryJoinAll};
+
+ use util::assert_done;
+
assert_done(
|| Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()),
Ok::<_, usize>(vec![1, 2]),