From 67f2b76b07ae7e83e0f215221ef9b812e35b444d Mon Sep 17 00:00:00 2001 From: Haibo Huang Date: Fri, 8 May 2020 19:24:38 -0700 Subject: Upgrade rust/crates/futures to 0.3.5 Test: None Change-Id: I29d1bd33fb7067b317f1784e5123f63a074b8b8b --- Cargo.toml | 34 ++- Cargo.toml.orig | 26 +- METADATA | 13 +- src/lib.rs | 83 +----- tests/abortable.rs | 21 +- tests/arc_wake.rs | 107 +++---- tests/async_await_macros.rs | 116 +++++++- tests/atomic_waker.rs | 19 +- tests/buffer_unordered.rs | 15 +- tests/eager_drop.rs | 169 ++++++----- tests/eventual.rs | 1 + tests/future_try_flatten_stream.rs | 98 +++--- tests/futures_ordered.rs | 31 +- tests/futures_unordered.rs | 78 ++++- tests/inspect.rs | 7 +- tests/io_buf_reader.rs | 287 ++++++++++-------- tests/io_buf_writer.rs | 222 ++++++++------ tests/io_cursor.rs | 20 +- tests/io_lines.rs | 67 +++-- tests/io_read.rs | 58 ++-- tests/io_read_exact.rs | 7 +- tests/io_read_line.rs | 39 ++- tests/io_read_to_string.rs | 39 +-- tests/io_read_until.rs | 40 ++- tests/io_window.rs | 1 + tests/io_write.rs | 70 +++-- tests/join_all.rs | 44 ++- tests/macro_comma_support.rs | 37 ++- tests/mutex.rs | 31 +- tests/object_safety.rs | 1 + tests/oneshot.rs | 43 ++- tests/ready_queue.rs | 54 +++- tests/recurse.rs | 11 +- tests/select_all.rs | 9 +- tests/select_ok.rs | 11 +- tests/shared.rs | 107 +++++-- tests/sink.rs | 594 +++++++++++++++++++++++-------------- tests/sink_fanout.rs | 13 +- tests/split.rs | 101 +++---- tests/stream.rs | 145 ++++++++- tests/stream_catch_unwind.rs | 11 +- tests/stream_into_async_read.rs | 98 +++--- tests/stream_peekable.rs | 9 +- tests/stream_select_all.rs | 23 +- tests/stream_select_next_some.rs | 27 +- tests/try_join.rs | 2 + tests/try_join_all.rs | 42 ++- 47 files changed, 1977 insertions(+), 1104 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3f93105..290b3cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,11 @@ [package] edition = "2018" name = "futures" -version = "0.3.4" +version = "0.3.5" authors = ["Alex Crichton "] description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures/0.3.0" +documentation = "https://docs.rs/futures/0.3.5" readme = "../README.md" keywords = ["futures", "async", "future"] categories = ["asynchronous"] @@ -29,35 +29,50 @@ all-features = true [package.metadata.playground] features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"] [dependencies.futures-channel] -version = "0.3.4" +version = "0.3.5" features = ["sink"] default-features = false [dependencies.futures-core] -version = "0.3.4" +version = "0.3.5" default-features = false [dependencies.futures-executor] -version = "0.3.4" +version = "0.3.5" optional = true default-features = false [dependencies.futures-io] -version = "0.3.4" +version = "0.3.5" default-features = false [dependencies.futures-sink] -version = "0.3.4" +version = "0.3.5" default-features = false [dependencies.futures-task] -version = "0.3.4" +version = "0.3.5" default-features = false [dependencies.futures-util] -version = "0.3.4" +version = "0.3.5" features = ["sink"] default-features = false +[dev-dependencies.assert_matches] +version = "1.3.0" + +[dev-dependencies.futures-executor] +version = "0.3.5" +features = ["thread-pool"] + +[dev-dependencies.futures-test] +version = "0.3.5" + +[dev-dependencies.pin-utils] +version = "0.1.0" + +[dev-dependencies.tokio] +version = "0.1.11" [features] alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"] @@ -72,5 +87,6 @@ read-initializer = ["futures-io/read-initializer", "futures-util/read-initialize std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"] thread-pool = ["executor", "futures-executor/thread-pool"] unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"] +write-all-vectored = ["futures-util/write-all-vectored"] [badges.travis-ci] repository = "rust-lang/futures-rs" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 7095c19..c7288ed 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,14 +1,14 @@ [package] name = "futures" edition = "2018" -version = "0.3.4" +version = "0.3.5" authors = ["Alex Crichton "] license = "MIT OR Apache-2.0" readme = "../README.md" keywords = ["futures", "async", "future"] repository = "https://github.com/rust-lang/futures-rs" homepage = "https://rust-lang.github.io/futures-rs" -documentation = "https://docs.rs/futures/0.3.0" +documentation = "https://docs.rs/futures/0.3.5" description = """ An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces. @@ -19,13 +19,20 @@ categories = ["asynchronous"] travis-ci = { repository = "rust-lang/futures-rs" } [dependencies] -futures-core = { path = "../futures-core", version = "0.3.4", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.4", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.4", default-features = false, features = ["sink"] } -futures-executor = { path = "../futures-executor", version = "0.3.4", default-features = false, optional = true } -futures-io = { path = "../futures-io", version = "0.3.4", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.3.4", default-features = false } -futures-util = { path = "../futures-util", version = "0.3.4", default-features = false, features = ["sink"] } +futures-core = { path = "../futures-core", version = "0.3.5", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.5", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.5", default-features = false, features = ["sink"] } +futures-executor = { path = "../futures-executor", version = "0.3.5", default-features = false, optional = true } +futures-io = { path = "../futures-io", version = "0.3.5", default-features = false } +futures-sink = { path = "../futures-sink", version = "0.3.5", default-features = false } +futures-util = { path = "../futures-util", version = "0.3.5", default-features = false, features = ["sink"] } + +[dev-dependencies] +pin-utils = "0.1.0" +futures-executor = { path = "../futures-executor", version = "0.3.5", features = ["thread-pool"] } +futures-test = { path = "../futures-test", version = "0.3.5" } +tokio = "0.1.11" +assert_matches = "1.3.0" [features] default = ["std", "async-await", "executor"] @@ -44,6 +51,7 @@ unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/u cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"] bilock = ["futures-util/bilock"] read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"] +write-all-vectored = ["futures-util/write-all-vectored"] [package.metadata.docs.rs] all-features = true diff --git a/METADATA b/METADATA index 42d8dba..06ea8d4 100644 --- a/METADATA +++ b/METADATA @@ -1,8 +1,5 @@ name: "futures" -description: - "An implementation of futures and streams featuring zero allocations, " - "composability, and iterator-like interfaces." - +description: "An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces." third_party { url { type: HOMEPAGE @@ -12,7 +9,11 @@ third_party { type: GIT value: "https://github.com/rust-lang/futures-rs" } - version: "0.3.4" - last_upgrade_date { year: 2020 month: 3 day: 17 } + version: "0.3.5" license_type: NOTICE + last_upgrade_date { + year: 2020 + month: 5 + day: 8 + } } diff --git a/src/lib.rs b/src/lib.rs index 3cdb3d3..e6380d2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,7 +90,7 @@ #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] -#![doc(html_root_url = "https://docs.rs/futures/0.3.0")] +#![doc(html_root_url = "https://docs.rs/futures/0.3.5")] #[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))] compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features"); @@ -118,9 +118,11 @@ compile_error!("The `read-initializer` feature requires the `unstable` feature a // Macro reexports pub use futures_core::ready; // Readiness propagation pub use futures_util::pin_mut; +#[cfg(feature = "async-await")] +pub use futures_util::{pending, poll, join, try_join, select_biased}; // Async-await #[cfg(feature = "std")] #[cfg(feature = "async-await")] -pub use futures_util::{pending, poll}; // Async-await +pub use futures_util::select; #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] @@ -332,7 +334,7 @@ pub mod io { BufReader, BufWriter, Cursor, Chain, Close, copy, Copy, copy_buf, CopyBuf, empty, Empty, Flush, IntoSink, Lines, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd, ReadToString, ReadUntil, ReadVectored, repeat, - Repeat, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf, + Repeat, ReuniteError, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf, WriteVectored, }; } @@ -442,8 +444,8 @@ pub mod stream { try_unfold, TryUnfold, StreamExt, - Chain, Collect, Concat, Enumerate, Filter, FilterMap, Flatten, Fold, - Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next, + Chain, Collect, Concat, Enumerate, Filter, FilterMap, FlatMap, Flatten, + Fold, Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next, SelectNextSome, Peek, Peekable, Scan, Skip, SkipWhile, Take, TakeWhile, Then, Zip, @@ -458,7 +460,7 @@ pub mod stream { #[cfg(feature = "alloc")] pub use futures_util::stream::{ // For StreamExt: - Chunks, + Chunks, ReadyChunks, }; #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] @@ -534,72 +536,3 @@ pub mod never { pub use futures_util::never::Never; } - -// proc-macro re-export -------------------------------------- - -// Not public API. -#[doc(hidden)] -pub use futures_core::core_reexport; - -// Not public API. -#[cfg(feature = "async-await")] -#[doc(hidden)] -pub use futures_util::async_await; - -// Not public API. -#[cfg(feature = "async-await")] -#[doc(hidden)] -pub mod inner_macro { - pub use futures_util::join; - pub use futures_util::try_join; - #[cfg(feature = "std")] - pub use futures_util::select; - pub use futures_util::select_biased; -} - -#[cfg(feature = "async-await")] -futures_util::document_join_macro! { - #[macro_export] - macro_rules! join { // replace `::futures_util` with `::futures` as the crate path - ($($tokens:tt)*) => { - $crate::inner_macro::join! { - futures_crate_path ( ::futures ) - $( $tokens )* - } - } - } - - #[macro_export] - macro_rules! try_join { // replace `::futures_util` with `::futures` as the crate path - ($($tokens:tt)*) => { - $crate::inner_macro::try_join! { - futures_crate_path ( ::futures ) - $( $tokens )* - } - } - } -} - -#[cfg(feature = "async-await")] -futures_util::document_select_macro! { - #[cfg(feature = "std")] - #[macro_export] - macro_rules! select { // replace `::futures_util` with `::futures` as the crate path - ($($tokens:tt)*) => { - $crate::inner_macro::select! { - futures_crate_path ( ::futures ) - $( $tokens )* - } - } - } - - #[macro_export] - macro_rules! select_biased { // replace `::futures_util` with `::futures` as the crate path - ($($tokens:tt)*) => { - $crate::inner_macro::select_biased! { - futures_crate_path ( ::futures ) - $( $tokens )* - } - } - } -} diff --git a/tests/abortable.rs b/tests/abortable.rs index 5925c9a..fcbabe9 100644 --- a/tests/abortable.rs +++ b/tests/abortable.rs @@ -1,11 +1,10 @@ -use futures::channel::oneshot; -use futures::executor::block_on; -use futures::future::{abortable, Aborted, FutureExt}; -use futures::task::{Context, Poll}; -use futures_test::task::new_count_waker; - +#[cfg(all(feature = "alloc", feature = "executor"))] #[test] fn abortable_works() { + use futures::channel::oneshot; + use futures::future::{abortable, Aborted}; + use futures::executor::block_on; + let (_tx, a_rx) = oneshot::channel::<()>(); let (abortable_rx, abort_handle) = abortable(a_rx); @@ -13,8 +12,14 @@ fn abortable_works() { assert_eq!(Err(Aborted), block_on(abortable_rx)); } +#[cfg(all(feature = "alloc", feature = "executor"))] #[test] fn abortable_awakens() { + use futures::channel::oneshot; + use futures::future::{abortable, Aborted, FutureExt}; + use futures::task::{Context, Poll}; + use futures_test::task::new_count_waker; + let (_tx, a_rx) = oneshot::channel::<()>(); let (mut abortable_rx, abort_handle) = abortable(a_rx); @@ -28,8 +33,12 @@ fn abortable_awakens() { assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx)); } +#[cfg(all(feature = "alloc", feature = "executor"))] #[test] fn abortable_resolves() { + use futures::channel::oneshot; + use futures::future::abortable; + use futures::executor::block_on; let (tx, a_rx) = oneshot::channel::<()>(); let (abortable_rx, _abort_handle) = abortable(a_rx); diff --git a/tests/arc_wake.rs b/tests/arc_wake.rs index 1940e4f..38217f0 100644 --- a/tests/arc_wake.rs +++ b/tests/arc_wake.rs @@ -1,60 +1,79 @@ -use futures::task::{self, ArcWake, Waker}; -use std::sync::{Arc, Mutex}; +#[cfg(feature = "alloc")] +mod countingwaker { + use futures::task::{self, ArcWake, Waker}; + use std::sync::{Arc, Mutex}; -struct CountingWaker { - nr_wake: Mutex, -} + struct CountingWaker { + nr_wake: Mutex, + } -impl CountingWaker { - fn new() -> CountingWaker { - CountingWaker { - nr_wake: Mutex::new(0), + impl CountingWaker { + fn new() -> CountingWaker { + CountingWaker { + nr_wake: Mutex::new(0), + } } - } - fn wakes(&self) -> i32 { - *self.nr_wake.lock().unwrap() + fn wakes(&self) -> i32 { + *self.nr_wake.lock().unwrap() + } } -} -impl ArcWake for CountingWaker { - fn wake_by_ref(arc_self: &Arc) { - let mut lock = arc_self.nr_wake.lock().unwrap(); - *lock += 1; + impl ArcWake for CountingWaker { + fn wake_by_ref(arc_self: &Arc) { + let mut lock = arc_self.nr_wake.lock().unwrap(); + *lock += 1; + } } -} -#[test] -fn create_waker_from_arc() { - let some_w = Arc::new(CountingWaker::new()); + #[test] + fn create_from_arc() { + let some_w = Arc::new(CountingWaker::new()); - let w1: Waker = task::waker(some_w.clone()); - assert_eq!(2, Arc::strong_count(&some_w)); - w1.wake_by_ref(); - assert_eq!(1, some_w.wakes()); + let w1: Waker = task::waker(some_w.clone()); + assert_eq!(2, Arc::strong_count(&some_w)); + w1.wake_by_ref(); + assert_eq!(1, some_w.wakes()); - let w2 = w1.clone(); - assert_eq!(3, Arc::strong_count(&some_w)); + let w2 = w1.clone(); + assert_eq!(3, Arc::strong_count(&some_w)); - w2.wake_by_ref(); - assert_eq!(2, some_w.wakes()); + w2.wake_by_ref(); + assert_eq!(2, some_w.wakes()); - drop(w2); - assert_eq!(2, Arc::strong_count(&some_w)); - drop(w1); - assert_eq!(1, Arc::strong_count(&some_w)); -} + drop(w2); + assert_eq!(2, Arc::strong_count(&some_w)); + drop(w1); + assert_eq!(1, Arc::strong_count(&some_w)); + } -struct PanicWaker; + #[test] + fn ref_wake_same() { + let some_w = Arc::new(CountingWaker::new()); -impl ArcWake for PanicWaker { - fn wake_by_ref(_arc_self: &Arc) { - panic!("WAKE UP"); + let w1: Waker = task::waker(some_w.clone()); + let w2 = task::waker_ref(&some_w); + let w3 = w2.clone(); + + assert!(w1.will_wake(&w2)); + assert!(w2.will_wake(&w3)); } } +#[cfg(feature = "alloc")] #[test] fn proper_refcount_on_wake_panic() { + use futures::task::{self, ArcWake, Waker}; + use std::sync::Arc; + + struct PanicWaker; + + impl ArcWake for PanicWaker { + fn wake_by_ref(_arc_self: &Arc) { + panic!("WAKE UP"); + } + } + let some_w = Arc::new(PanicWaker); let w1: Waker = task::waker(some_w.clone()); @@ -63,15 +82,3 @@ fn proper_refcount_on_wake_panic() { drop(w1); assert_eq!(1, Arc::strong_count(&some_w)); // some_w } - -#[test] -fn waker_ref_wake_same() { - let some_w = Arc::new(CountingWaker::new()); - - let w1: Waker = task::waker(some_w.clone()); - let w2 = task::waker_ref(&some_w); - let w3 = w2.clone(); - - assert!(w1.will_wake(&w2)); - assert!(w2.will_wake(&w3)); -} diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs index bc717df..fd2a349 100644 --- a/tests/async_await_macros.rs +++ b/tests/async_await_macros.rs @@ -1,15 +1,12 @@ #![recursion_limit="128"] -use futures::{pending, pin_mut, poll, join, try_join, select}; -use futures::channel::{mpsc, oneshot}; -use futures::executor::block_on; -use futures::future::{self, FutureExt, poll_fn}; -use futures::sink::SinkExt; -use futures::stream::StreamExt; -use futures::task::{Context, Poll}; - +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn poll_and_pending() { + use futures::{pending, pin_mut, poll}; + use futures::executor::block_on; + use futures::task::Poll; + let pending_once = async { pending!() }; block_on(async { pin_mut!(pending_once); @@ -18,8 +15,14 @@ fn poll_and_pending() { }); } +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn join() { + use futures::{pin_mut, poll, join}; + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::task::Poll; + let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = oneshot::channel::(); @@ -38,8 +41,14 @@ fn join() { }); } +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn select() { + use futures::select; + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future::FutureExt; + let (tx1, rx1) = oneshot::channel::(); let (_tx2, rx2) = oneshot::channel::(); tx1.send(1).unwrap(); @@ -56,8 +65,12 @@ fn select() { assert!(ran); } +#[cfg(all(feature = "alloc", feature = "executor", feature = "async-await"))] #[test] fn select_biased() { + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future::FutureExt; use futures::select_biased; let (tx1, rx1) = oneshot::channel::(); @@ -76,8 +89,15 @@ fn select_biased() { assert!(ran); } +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn select_streams() { + use futures::select; + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::sink::SinkExt; + use futures::stream::StreamExt; + let (mut tx1, rx1) = mpsc::channel::(1); let (mut tx2, rx2) = mpsc::channel::(1); let mut rx1 = rx1.fuse(); @@ -119,8 +139,14 @@ fn select_streams() { assert!(ran); } +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn select_can_move_uncompleted_futures() { + use futures::select; + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future::FutureExt; + let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = oneshot::channel::(); tx1.send(1).unwrap(); @@ -145,8 +171,13 @@ fn select_can_move_uncompleted_futures() { assert!(ran); } +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn select_nested() { + use futures::select; + use futures::executor::block_on; + use futures::future; + let mut outer_fut = future::ready(1); let mut inner_fut = future::ready(2); let res = block_on(async { @@ -161,8 +192,12 @@ fn select_nested() { assert_eq!(res, 3); } +#[cfg(all(feature = "async-await", feature = "std"))] #[test] fn select_size() { + use futures::select; + use futures::future; + let fut = async { let mut ready = future::ready(0i32); select! { @@ -182,8 +217,13 @@ fn select_size() { assert_eq!(::std::mem::size_of_val(&fut), 40); } +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn select_on_non_unpin_expressions() { + use futures::select; + use futures::executor::block_on; + use futures::future::FutureExt; + // The returned Future is !Unpin let make_non_unpin_fut = || { async { 5 @@ -200,8 +240,13 @@ fn select_on_non_unpin_expressions() { assert_eq!(res, 5); } +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn select_on_non_unpin_expressions_with_default() { + use futures::select; + use futures::executor::block_on; + use futures::future::FutureExt; + // The returned Future is !Unpin let make_non_unpin_fut = || { async { 5 @@ -219,8 +264,12 @@ fn select_on_non_unpin_expressions_with_default() { assert_eq!(res, 5); } +#[cfg(all(feature = "async-await", feature = "std"))] #[test] fn select_on_non_unpin_size() { + use futures::select; + use futures::future::FutureExt; + // The returned Future is !Unpin let make_non_unpin_fut = || { async { 5 @@ -235,11 +284,16 @@ fn select_on_non_unpin_size() { select_res }; - assert_eq!(48, std::mem::size_of_val(&fut)); + assert_eq!(32, std::mem::size_of_val(&fut)); } +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn select_can_be_used_as_expression() { + use futures::select; + use futures::executor::block_on; + use futures::future; + block_on(async { let res = select! { x = future::ready(7) => { x }, @@ -249,8 +303,14 @@ fn select_can_be_used_as_expression() { }); } +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn select_with_default_can_be_used_as_expression() { + use futures::select; + use futures::executor::block_on; + use futures::future::{FutureExt, poll_fn}; + use futures::task::{Context, Poll}; + fn poll_always_pending(_cx: &mut Context<'_>) -> Poll { Poll::Pending } @@ -265,8 +325,13 @@ fn select_with_default_can_be_used_as_expression() { }); } +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn select_with_complete_can_be_used_as_expression() { + use futures::select; + use futures::executor::block_on; + use futures::future; + block_on(async { let res = select! { x = future::pending::() => { x }, @@ -278,11 +343,16 @@ fn select_with_complete_can_be_used_as_expression() { }); } -async fn require_mutable(_: &mut i32) {} -async fn async_noop() {} - +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { + use futures::select; + use futures::executor::block_on; + use futures::future::FutureExt; + + async fn require_mutable(_: &mut i32) {} + async fn async_noop() {} + block_on(async { let mut value = 234; select! { @@ -294,8 +364,16 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { }); } +#[cfg(all(feature = "async-await", feature = "std", feature = "executor"))] #[test] fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { + use futures::select; + use futures::executor::block_on; + use futures::future::FutureExt; + + async fn require_mutable(_: &mut i32) {} + async fn async_noop() {} + block_on(async { let mut value = 234; select! { @@ -310,8 +388,12 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { }); } +#[cfg(feature = "async-await")] #[test] fn join_size() { + use futures::join; + use futures::future; + let fut = async { let ready = future::ready(0i32); join!(ready) @@ -326,8 +408,12 @@ fn join_size() { assert_eq!(::std::mem::size_of_val(&fut), 28); } +#[cfg(feature = "async-await")] #[test] fn try_join_size() { + use futures::try_join; + use futures::future; + let fut = async { let ready = future::ready(Ok::(0)); try_join!(ready) @@ -342,15 +428,21 @@ fn try_join_size() { assert_eq!(::std::mem::size_of_val(&fut), 28); } +#[cfg(feature = "async-await")] #[test] fn join_doesnt_require_unpin() { + use futures::join; + let _ = async { join!(async {}, async {}) }; } +#[cfg(feature = "async-await")] #[test] fn try_join_doesnt_require_unpin() { + use futures::try_join; + let _ = async { try_join!( async { Ok::<(), ()>(()) }, diff --git a/tests/atomic_waker.rs b/tests/atomic_waker.rs index d9ce753..5693bd0 100644 --- a/tests/atomic_waker.rs +++ b/tests/atomic_waker.rs @@ -1,14 +1,15 @@ -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::thread; - -use futures::executor::block_on; -use futures::future::poll_fn; -use futures::task::{AtomicWaker, Poll}; - +#[cfg(feature = "executor")] #[test] fn basic() { + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use std::sync::Arc; + use std::thread; + + use futures::executor::block_on; + use futures::future::poll_fn; + use futures::task::{AtomicWaker, Poll}; + let atomic_waker = Arc::new(AtomicWaker::new()); let atomic_waker_copy = atomic_waker.clone(); diff --git a/tests/buffer_unordered.rs b/tests/buffer_unordered.rs index 1c559c8..6485a1e 100644 --- a/tests/buffer_unordered.rs +++ b/tests/buffer_unordered.rs @@ -1,13 +1,14 @@ -use futures::channel::{oneshot, mpsc}; -use futures::executor::{block_on, block_on_stream}; -use futures::sink::SinkExt; -use futures::stream::StreamExt; -use std::sync::mpsc as std_mpsc; -use std::thread; - +#[cfg(all(feature = "alloc", feature = "std", feature = "executor"))] #[test] #[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790 fn works() { + use futures::channel::{oneshot, mpsc}; + use futures::executor::{block_on, block_on_stream}; + use futures::sink::SinkExt; + use futures::stream::StreamExt; + use std::sync::mpsc as std_mpsc; + use std::thread; + const N: usize = 4; let (mut tx, rx) = mpsc::channel(1); diff --git a/tests/eager_drop.rs b/tests/eager_drop.rs index 674e401..bfb60a7 100644 --- a/tests/eager_drop.rs +++ b/tests/eager_drop.rs @@ -1,13 +1,9 @@ -use futures::channel::oneshot; -use futures::future::{self, Future, FutureExt, TryFutureExt}; -use futures::task::{Context, Poll}; -use futures_test::future::FutureTestExt; -use pin_utils::unsafe_pinned; -use std::pin::Pin; -use std::sync::mpsc; - #[test] fn map_ok() { + use futures::future::{self, FutureExt, TryFutureExt}; + use futures_test::future::FutureTestExt; + use std::sync::mpsc; + // The closure given to `map_ok` should have been dropped by the time `map` // runs. let (tx1, rx1) = mpsc::channel::<()>(); @@ -26,6 +22,10 @@ fn map_ok() { #[test] fn map_err() { + use futures::future::{self, FutureExt, TryFutureExt}; + use futures_test::future::FutureTestExt; + use std::sync::mpsc; + // The closure given to `map_err` should have been dropped by the time `map` // runs. let (tx1, rx1) = mpsc::channel::<()>(); @@ -42,76 +42,101 @@ fn map_err() { rx2.recv().unwrap(); } -struct FutureData { - _data: T, - future: F, -} +mod channelled { + use pin_utils::unsafe_pinned; + use futures::future::Future; + use std::pin::Pin; + use futures::task::{Context,Poll}; -impl FutureData { - unsafe_pinned!(future: F); -} - -impl Future for FutureData { - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.future().poll(cx) + struct FutureData { + _data: T, + future: F, } -} - -#[test] -fn then_drops_eagerly() { - let (tx0, rx0) = oneshot::channel::<()>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); - - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .then(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready(()) - }) - .run_in_background(); - - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(()).unwrap(); - rx2.recv().unwrap(); -} -#[test] -fn and_then_drops_eagerly() { - let (tx0, rx0) = oneshot::channel::>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); + impl FutureData { + unsafe_pinned!(future: F); + } - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .and_then(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready(Ok(())) - }) - .run_in_background(); + impl Future for FutureData { + type Output = F::Output; - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(Ok(())).unwrap(); - rx2.recv().unwrap(); -} + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.future().poll(cx) + } + } -#[test] -fn or_else_drops_eagerly() { - let (tx0, rx0) = oneshot::channel::>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); + #[cfg(feature = "alloc")] + #[test] + fn then_drops_eagerly() { + use futures::channel::oneshot; + use futures::future::{self, FutureExt, TryFutureExt}; + use futures_test::future::FutureTestExt; + use std::sync::mpsc; + + let (tx0, rx0) = oneshot::channel::<()>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } + .then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(()) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(()).unwrap(); + rx2.recv().unwrap(); + } - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .or_else(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready::>(Ok(())) - }) - .run_in_background(); + #[cfg(feature = "alloc")] + #[test] + fn and_then_drops_eagerly() { + use futures::channel::oneshot; + use futures::future::{self, TryFutureExt}; + use futures_test::future::FutureTestExt; + use std::sync::mpsc; + + let (tx0, rx0) = oneshot::channel::>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } + .and_then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Ok(())).unwrap(); + rx2.recv().unwrap(); + } - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(Err(())).unwrap(); - rx2.recv().unwrap(); + #[cfg(feature = "alloc")] + #[test] + fn or_else_drops_eagerly() { + use futures::channel::oneshot; + use futures::future::{self, TryFutureExt}; + use futures_test::future::FutureTestExt; + use std::sync::mpsc; + + let (tx0, rx0) = oneshot::channel::>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } + .or_else(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready::>(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Err(())).unwrap(); + rx2.recv().unwrap(); + } } diff --git a/tests/eventual.rs b/tests/eventual.rs index bff000d..6835588 100644 --- a/tests/eventual.rs +++ b/tests/eventual.rs @@ -1,3 +1,4 @@ +#![cfg(all(feature = "executor", feature = "thread-pool"))] use futures::channel::oneshot; use futures::executor::ThreadPool; use futures::future::{self, ok, Future, FutureExt, TryFutureExt}; diff --git a/tests/future_try_flatten_stream.rs b/tests/future_try_flatten_stream.rs index 082c5ef..aa85ed0 100644 --- a/tests/future_try_flatten_stream.rs +++ b/tests/future_try_flatten_stream.rs @@ -1,13 +1,10 @@ -use core::marker::PhantomData; -use core::pin::Pin; -use futures::executor::block_on_stream; -use futures::future::{ok, err, TryFutureExt}; -use futures::sink::Sink; -use futures::stream::{self, Stream, StreamExt}; -use futures::task::{Context, Poll}; - +#[cfg(feature = "executor")] #[test] fn successful_future() { + use futures::executor::block_on_stream; + use futures::future::{ok, TryFutureExt}; + use futures::stream::{self, StreamExt}; + let stream_items = vec![17, 19]; let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok)); @@ -19,20 +16,28 @@ fn successful_future() { assert_eq!(None, iter.next()); } -struct PanickingStream { - _marker: PhantomData<(T, E)> -} +#[cfg(feature = "executor")] +#[test] +fn failed_future() { + use core::marker::PhantomData; + use core::pin::Pin; + use futures::executor::block_on_stream; + use futures::future::{err, TryFutureExt}; + use futures::stream::Stream; + use futures::task::{Context, Poll}; -impl Stream for PanickingStream { - type Item = Result; + struct PanickingStream { + _marker: PhantomData<(T, E)> + } + + impl Stream for PanickingStream { + type Item = Result; - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - panic!() + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + panic!() + } } -} -#[test] -fn failed_future() { let future_of_a_stream = err::, _>(10); let stream = future_of_a_stream.try_flatten_stream(); let mut iter = block_on_stream(stream); @@ -40,37 +45,44 @@ fn failed_future() { assert_eq!(None, iter.next()); } -struct StreamSink(PhantomData<(T, E, Item)>); +#[test] +fn assert_impls() { + use core::marker::PhantomData; + use core::pin::Pin; + use futures::sink::Sink; + use futures::stream::Stream; + use futures::task::{Context, Poll}; + use futures::future::{ok, TryFutureExt}; -impl Stream for StreamSink { - type Item = Result; - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - panic!() - } -} + struct StreamSink(PhantomData<(T, E, Item)>); -impl Sink for StreamSink { - type Error = E; - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - panic!() - } - fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> { - panic!() + impl Stream for StreamSink { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + panic!() + } } - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - panic!() - } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - panic!() + + impl Sink for StreamSink { + type Error = E; + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + panic!() + } + fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> { + panic!() + } + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + panic!() + } + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + panic!() + } } -} -fn assert_stream(_: &S) {} -fn assert_sink, Item>(_: &S) {} -fn assert_stream_sink, Item>(_: &S) {} + fn assert_stream(_: &S) {} + fn assert_sink, Item>(_: &S) {} + fn assert_stream_sink, Item>(_: &S) {} -#[test] -fn assert_impls() { let s = ok(StreamSink::<(), (), ()>(PhantomData)).try_flatten_stream(); assert_stream(&s); assert_sink(&s); diff --git a/tests/futures_ordered.rs b/tests/futures_ordered.rs index d06b62f..74a220a 100644 --- a/tests/futures_ordered.rs +++ b/tests/futures_ordered.rs @@ -1,12 +1,11 @@ -use futures::channel::oneshot; -use futures::executor::{block_on, block_on_stream}; -use futures::future::{self, join, Future, FutureExt, TryFutureExt}; -use futures::stream::{StreamExt, FuturesOrdered}; -use futures_test::task::noop_context; -use std::any::Any; - +#[cfg(all(feature = "alloc", feature="executor"))] #[test] fn works_1() { + use futures::channel::oneshot; + use futures::executor::block_on_stream; + use futures::stream::{StreamExt, FuturesOrdered}; + use futures_test::task::noop_context; + let (a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); @@ -26,8 +25,14 @@ fn works_1() { assert_eq!(None, iter.next()); } +#[cfg(feature = "alloc")] #[test] fn works_2() { + use futures::channel::oneshot; + use futures::future::{join, FutureExt}; + use futures::stream::{StreamExt, FuturesOrdered}; + use futures_test::task::noop_context; + let (a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); @@ -46,8 +51,13 @@ fn works_2() { assert!(stream.poll_next_unpin(&mut cx).is_ready()); } +#[cfg(feature = "executor")] #[test] fn from_iterator() { + use futures::executor::block_on; + use futures::future; + use futures::stream::{StreamExt, FuturesOrdered}; + let stream = vec![ future::ready::(1), future::ready::(2), @@ -57,8 +67,15 @@ fn from_iterator() { assert_eq!(block_on(stream.collect::>()), vec![1,2,3]); } +#[cfg(feature = "alloc")] #[test] fn queue_never_unblocked() { + use futures::channel::oneshot; + use futures::future::{self, Future, TryFutureExt}; + use futures::stream::{StreamExt, FuturesOrdered}; + use futures_test::task::noop_context; + use std::any::Any; + let (_a_tx, a_rx) = oneshot::channel::>(); let (b_tx, b_rx) = oneshot::channel::>(); let (c_tx, c_rx) = oneshot::channel::>(); diff --git a/tests/futures_unordered.rs b/tests/futures_unordered.rs index 57eb98f..3285903 100644 --- a/tests/futures_unordered.rs +++ b/tests/futures_unordered.rs @@ -1,18 +1,11 @@ -use std::marker::Unpin; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, Ordering}; - -use futures::channel::oneshot; -use futures::executor::{block_on, block_on_stream}; -use futures::future::{self, join, Future, FutureExt}; -use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; -use futures::task::{Context, Poll}; -use futures_test::future::FutureTestExt; -use futures_test::task::noop_context; -use futures_test::{assert_stream_done, assert_stream_next}; - +#[cfg(feature = "alloc")] #[test] fn is_terminated() { + use futures::future; + use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; + use futures::task::Poll; + use futures_test::task::noop_context; + let mut cx = noop_context(); let mut tasks = FuturesUnordered::new(); @@ -38,8 +31,13 @@ fn is_terminated() { assert_eq!(tasks.is_terminated(), true); } +#[cfg(all(feature = "alloc", feature = "executor"))] #[test] fn works_1() { + use futures::channel::oneshot; + use futures::executor::block_on_stream; + use futures::stream::FuturesUnordered; + let (a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); @@ -60,8 +58,15 @@ fn works_1() { assert_eq!(None, iter.next()); } +#[cfg(feature = "alloc")] #[test] fn works_2() { + use futures::channel::oneshot; + use futures::future::{join, FutureExt}; + use futures::stream::{FuturesUnordered, StreamExt}; + use futures::task::Poll; + use futures_test::task::noop_context; + let (a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); @@ -83,8 +88,13 @@ fn works_2() { assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(None)); } +#[cfg(feature = "executor")] #[test] fn from_iterator() { + use futures::executor::block_on; + use futures::future; + use futures::stream::{FuturesUnordered, StreamExt}; + let stream = vec![ future::ready::(1), future::ready::(2), @@ -96,8 +106,15 @@ fn from_iterator() { assert_eq!(block_on(stream.collect::>()), vec![1, 2, 3]); } +#[cfg(feature = "alloc")] #[test] fn finished_future() { + use std::marker::Unpin; + use futures::channel::oneshot; + use futures::future::{self, Future, FutureExt}; + use futures::stream::{FuturesUnordered, StreamExt}; + use futures_test::task::noop_context; + let (_a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); @@ -121,8 +138,13 @@ fn finished_future() { assert!(stream.poll_next_unpin(cx).is_pending()); } +#[cfg(all(feature = "alloc", feature = "executor"))] #[test] fn iter_mut_cancel() { + use futures::channel::oneshot; + use futures::executor::block_on_stream; + use futures::stream::FuturesUnordered; + let (a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); @@ -147,8 +169,12 @@ fn iter_mut_cancel() { assert_eq!(iter.next(), None); } +#[cfg(feature = "alloc")] #[test] fn iter_mut_len() { + use futures::future; + use futures::stream::FuturesUnordered; + let mut stream = vec![ future::pending::<()>(), future::pending::<()>(), @@ -168,8 +194,18 @@ fn iter_mut_len() { assert!(iter_mut.next().is_none()); } +#[cfg(feature = "executor")] #[test] fn iter_cancel() { + use std::marker::Unpin; + use std::pin::Pin; + use std::sync::atomic::{AtomicBool, Ordering}; + + use futures::executor::block_on_stream; + use futures::future::{self, Future, FutureExt}; + use futures::stream::FuturesUnordered; + use futures::task::{Context, Poll}; + struct AtomicCancel { future: F, cancel: AtomicBool, @@ -213,8 +249,12 @@ fn iter_cancel() { assert_eq!(iter.next(), None); } +#[cfg(feature = "alloc")] #[test] fn iter_len() { + use futures::future; + use futures::stream::FuturesUnordered; + let stream = vec![ future::pending::<()>(), future::pending::<()>(), @@ -234,8 +274,14 @@ fn iter_len() { assert!(iter.next().is_none()); } +#[cfg(feature = "alloc")] #[test] fn futures_not_moved_after_poll() { + use futures::future; + use futures::stream::FuturesUnordered; + use futures_test::future::FutureTestExt; + use futures_test::{assert_stream_done, assert_stream_next}; + // Future that will be ready after being polled twice, // asserting that it does not move. let fut = future::ready(()).pending_once().assert_unmoved(); @@ -246,8 +292,14 @@ fn futures_not_moved_after_poll() { assert_stream_done!(stream); } +#[cfg(feature = "alloc")] #[test] fn len_valid_during_out_of_order_completion() { + use futures::channel::oneshot; + use futures::stream::{FuturesUnordered, StreamExt}; + use futures::task::Poll; + use futures_test::task::noop_context; + // Complete futures out-of-order and add new futures afterwards to ensure // length values remain correct. let (a_tx, a_rx) = oneshot::channel::(); diff --git a/tests/inspect.rs b/tests/inspect.rs index 42f6f73..4cbe477 100644 --- a/tests/inspect.rs +++ b/tests/inspect.rs @@ -1,8 +1,9 @@ -use futures::executor::block_on; -use futures::future::{self, FutureExt}; - +#[cfg(feature = "executor")] #[test] fn smoke() { + use futures::executor::block_on; + use futures::future::{self, FutureExt}; + let mut counter = 0; { diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs index a3d723a..07d934d 100644 --- a/tests/io_buf_reader.rs +++ b/tests/io_buf_reader.rs @@ -1,32 +1,10 @@ -use futures::executor::block_on; -use futures::future::{Future, FutureExt}; -use futures::io::{ - AsyncSeek, AsyncSeekExt, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, - AllowStdIo, BufReader, Cursor, SeekFrom, -}; -use futures::task::{Context, Poll}; -use futures_test::task::noop_context; -use std::cmp; -use std::io; -use std::pin::Pin; - -/// A dummy reader intended at testing short-reads propagation. -struct ShortReader { - lengths: Vec, -} - -impl io::Read for ShortReader { - fn read(&mut self, _: &mut [u8]) -> io::Result { - if self.lengths.is_empty() { - Ok(0) - } else { - Ok(self.lengths.remove(0)) - } - } -} - +#[cfg(any(feature = "std", feature = "executor"))] macro_rules! run_fill_buf { ($reader:expr) => {{ + use futures_test::task::noop_context; + use futures::task::Poll; + use std::pin::Pin; + let mut cx = noop_context(); loop { if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) { @@ -36,8 +14,83 @@ macro_rules! run_fill_buf { }}; } +#[cfg(any(feature = "std", feature = "executor"))] +mod util { + use futures::future::Future; + pub fn run(mut f: F) -> F::Output { + use futures_test::task::noop_context; + use futures::task::Poll; + use futures::future::FutureExt; + + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } + } +} + +#[cfg(feature = "std")] +mod maybe_pending { + use futures::task::{Context,Poll}; + use std::{cmp,io}; + use std::pin::Pin; + use futures::io::{AsyncRead,AsyncBufRead}; + + pub struct MaybePending<'a> { + inner: &'a [u8], + ready_read: bool, + ready_fill_buf: bool, + } + + impl<'a> MaybePending<'a> { + pub fn new(inner: &'a [u8]) -> Self { + Self { inner, ready_read: false, ready_fill_buf: false } + } + } + + impl AsyncRead for MaybePending<'_> { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) + -> Poll> + { + if self.ready_read { + self.ready_read = false; + Pin::new(&mut self.inner).poll_read(cx, buf) + } else { + self.ready_read = true; + Poll::Pending + } + } + } + + impl AsyncBufRead for MaybePending<'_> { + fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) + -> Poll> + { + if self.ready_fill_buf { + self.ready_fill_buf = false; + if self.inner.is_empty() { return Poll::Ready(Ok(&[])) } + let len = cmp::min(2, self.inner.len()); + Poll::Ready(Ok(&self.inner[0..len])) + } else { + self.ready_fill_buf = true; + Poll::Pending + } + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.inner = &self.inner[amt..]; + } + } +} + +#[cfg(feature = "executor")] #[test] fn test_buffered_reader() { + use futures::executor::block_on; + use futures::io::{AsyncReadExt, BufReader}; + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(2, inner); @@ -73,8 +126,14 @@ fn test_buffered_reader() { assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); } +#[cfg(feature = "executor")] #[test] fn test_buffered_reader_seek() { + use futures::executor::block_on; + use futures::io::{AsyncSeekExt, AsyncBufRead, BufReader, Cursor, SeekFrom}; + use std::pin::Pin; + use util::run; + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); @@ -88,8 +147,13 @@ fn test_buffered_reader_seek() { assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); } +#[cfg(feature = "executor")] #[test] fn test_buffered_reader_seek_underflow() { + use futures::executor::block_on; + use futures::io::{AsyncSeekExt, AsyncBufRead, AllowStdIo, BufReader, SeekFrom}; + use std::io; + // gimmick reader that yields its position modulo 256 for each byte struct PositionReader { pos: u64 @@ -134,8 +198,28 @@ fn test_buffered_reader_seek_underflow() { assert_eq!(reader.get_ref().get_ref().pos, expected); } +#[cfg(feature = "executor")] #[test] fn test_short_reads() { + use futures::executor::block_on; + use futures::io::{AsyncReadExt, AllowStdIo, BufReader}; + use std::io; + + /// A dummy reader intended at testing short-reads propagation. + struct ShortReader { + lengths: Vec, + } + + impl io::Read for ShortReader { + fn read(&mut self, _: &mut [u8]) -> io::Result { + if self.lengths.is_empty() { + Ok(0) + } else { + Ok(self.lengths.remove(0)) + } + } + } + let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; let mut reader = BufReader::new(AllowStdIo::new(inner)); let mut buf = [0, 0]; @@ -148,63 +232,13 @@ fn test_short_reads() { assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); } -struct MaybePending<'a> { - inner: &'a [u8], - ready_read: bool, - ready_fill_buf: bool, -} - -impl<'a> MaybePending<'a> { - fn new(inner: &'a [u8]) -> Self { - Self { inner, ready_read: false, ready_fill_buf: false } - } -} - -impl AsyncRead for MaybePending<'_> { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll> - { - if self.ready_read { - self.ready_read = false; - Pin::new(&mut self.inner).poll_read(cx, buf) - } else { - self.ready_read = true; - Poll::Pending - } - } -} - -impl AsyncBufRead for MaybePending<'_> { - fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) - -> Poll> - { - if self.ready_fill_buf { - self.ready_fill_buf = false; - if self.inner.is_empty() { return Poll::Ready(Ok(&[])) } - let len = cmp::min(2, self.inner.len()); - Poll::Ready(Ok(&self.inner[0..len])) - } else { - self.ready_fill_buf = true; - Poll::Pending - } - } - - fn consume(mut self: Pin<&mut Self>, amt: usize) { - self.inner = &self.inner[amt..]; - } -} - -fn run(mut f: F) -> F::Output { - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } -} - +#[cfg(feature = "std")] #[test] fn maybe_pending() { + use futures::io::{AsyncReadExt, BufReader}; + use util::run; + use maybe_pending::MaybePending; + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(2, MaybePending::new(inner)); @@ -240,8 +274,13 @@ fn maybe_pending() { assert_eq!(run(reader.read(&mut buf)).unwrap(), 0); } +#[cfg(feature = "std")] #[test] fn maybe_pending_buf_read() { + use futures::io::{AsyncBufReadExt, BufReader}; + use util::run; + use maybe_pending::MaybePending; + let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]); let mut reader = BufReader::with_capacity(2, inner); let mut v = Vec::new(); @@ -258,55 +297,63 @@ fn maybe_pending_buf_read() { assert_eq!(v, []); } -struct MaybePendingSeek<'a> { - inner: Cursor<&'a [u8]>, - ready: bool, -} - -impl<'a> MaybePendingSeek<'a> { - fn new(inner: &'a [u8]) -> Self { - Self { inner: Cursor::new(inner), ready: true } +// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 +#[cfg(feature = "std")] +#[test] +fn maybe_pending_seek() { + use futures::io::{AsyncBufRead, AsyncSeek, AsyncSeekExt, AsyncRead, BufReader, + Cursor, SeekFrom + }; + use futures::task::{Context,Poll}; + use std::io; + use std::pin::Pin; + use util::run; + pub struct MaybePendingSeek<'a> { + inner: Cursor<&'a [u8]>, + ready: bool, } -} -impl AsyncRead for MaybePendingSeek<'_> { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll> - { - Pin::new(&mut self.inner).poll_read(cx, buf) + impl<'a> MaybePendingSeek<'a> { + pub fn new(inner: &'a [u8]) -> Self { + Self { inner: Cursor::new(inner), ready: true } + } } -} -impl AsyncBufRead for MaybePendingSeek<'_> { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll> - { - let this: *mut Self = &mut *self as *mut _; - Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx) + impl AsyncRead for MaybePendingSeek<'_> { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) + -> Poll> + { + Pin::new(&mut self.inner).poll_read(cx, buf) + } } - fn consume(mut self: Pin<&mut Self>, amt: usize) { - Pin::new(&mut self.inner).consume(amt) + impl AsyncBufRead for MaybePendingSeek<'_> { + fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll> + { + let this: *mut Self = &mut *self as *mut _; + Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + Pin::new(&mut self.inner).consume(amt) + } } -} -impl AsyncSeek for MaybePendingSeek<'_> { - fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) - -> Poll> - { - if self.ready { - self.ready = false; - Pin::new(&mut self.inner).poll_seek(cx, pos) - } else { - self.ready = true; - Poll::Pending + impl AsyncSeek for MaybePendingSeek<'_> { + fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) + -> Poll> + { + if self.ready { + self.ready = false; + Pin::new(&mut self.inner).poll_seek(cx, pos) + } else { + self.ready = true; + Poll::Pending + } } } -} -// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 -#[test] -fn maybe_pending_seek() { let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs index 7bdcd16..935335b 100644 --- a/tests/io_buf_writer.rs +++ b/tests/io_buf_writer.rs @@ -1,13 +1,70 @@ -use futures::executor::block_on; -use futures::future::{Future, FutureExt}; -use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom}; -use futures::task::{Context, Poll}; -use futures_test::task::noop_context; -use std::io; -use std::pin::Pin; +#[cfg(feature = "std")] +mod maybe_pending { + use futures::io::AsyncWrite; + use futures::task::{Context, Poll}; + use std::io; + use std::pin::Pin; + + pub struct MaybePending { + pub inner: Vec, + ready: bool, + } + + impl MaybePending { + pub fn new(inner: Vec) -> Self { + Self { inner, ready: false } + } + } + + impl AsyncWrite for MaybePending { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.ready { + self.ready = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready = true; + Poll::Pending + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) + } + } +} + +#[cfg(any(feature = "std", feature = "executor"))] +mod util { + use futures::future::Future; + + pub fn run(mut f: F) -> F::Output { + use futures::future::FutureExt; + use futures::task::Poll; + use futures_test::task::noop_context; + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } + } +} + +#[cfg(feature = "executor")] #[test] fn buf_writer() { + use futures::executor::block_on; + use futures::io::{AsyncWriteExt, BufWriter}; + let mut writer = BufWriter::with_capacity(2, Vec::new()); block_on(writer.write(&[0, 1])).unwrap(); @@ -48,8 +105,12 @@ fn buf_writer() { assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); } +#[cfg(feature = "executor")] #[test] fn buf_writer_inner_flushes() { + use futures::executor::block_on; + use futures::io::{AsyncWriteExt, BufWriter}; + let mut w = BufWriter::with_capacity(3, Vec::new()); block_on(w.write(&[0, 1])).unwrap(); assert_eq!(*w.get_ref(), []); @@ -58,8 +119,12 @@ fn buf_writer_inner_flushes() { assert_eq!(w, [0, 1]); } +#[cfg(feature = "executor")] #[test] fn buf_writer_seek() { + use futures::executor::block_on; + use futures::io::{AsyncSeekExt, AsyncWriteExt, BufWriter, Cursor, SeekFrom}; + // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed, // use `Vec::new` instead of `vec![0; 8]`. let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8])); @@ -73,52 +138,14 @@ fn buf_writer_seek() { assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); } -struct MaybePending { - inner: Vec, - ready: bool, -} - -impl MaybePending { - fn new(inner: Vec) -> Self { - Self { inner, ready: false } - } -} - -impl AsyncWrite for MaybePending { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if self.ready { - self.ready = false; - Pin::new(&mut self.inner).poll_write(cx, buf) - } else { - self.ready = true; - Poll::Pending - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_flush(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_close(cx) - } -} - -fn run(mut f: F) -> F::Output { - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } -} - +#[cfg(feature = "std")] #[test] fn maybe_pending_buf_writer() { + use futures::io::{AsyncWriteExt, BufWriter}; + + use maybe_pending::MaybePending; + use util::run; + let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new())); run(writer.write(&[0, 1])).unwrap(); @@ -159,8 +186,14 @@ fn maybe_pending_buf_writer() { assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); } +#[cfg(feature = "std")] #[test] fn maybe_pending_buf_writer_inner_flushes() { + use futures::io::{AsyncWriteExt, BufWriter}; + + use maybe_pending::MaybePending; + use util::run; + let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new())); run(w.write(&[0, 1])).unwrap(); assert_eq!(&w.get_ref().inner, &[]); @@ -169,59 +202,66 @@ fn maybe_pending_buf_writer_inner_flushes() { assert_eq!(w, [0, 1]); } +#[cfg(feature = "std")] +#[test] +fn maybe_pending_buf_writer_seek() { + use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom}; + use futures::task::{Context, Poll}; + use std::io; + use std::pin::Pin; -struct MaybePendingSeek { - inner: Cursor>, - ready_write: bool, - ready_seek: bool, -} + use util::run; -impl MaybePendingSeek { - fn new(inner: Vec) -> Self { - Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false } + struct MaybePendingSeek { + inner: Cursor>, + ready_write: bool, + ready_seek: bool, } -} -impl AsyncWrite for MaybePendingSeek { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if self.ready_write { - self.ready_write = false; - Pin::new(&mut self.inner).poll_write(cx, buf) - } else { - self.ready_write = true; - Poll::Pending + impl MaybePendingSeek { + fn new(inner: Vec) -> Self { + Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false } } } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_flush(cx) - } + impl AsyncWrite for MaybePendingSeek { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.ready_write { + self.ready_write = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready_write = true; + Poll::Pending + } + } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_close(cx) + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) + } } -} -impl AsyncSeek for MaybePendingSeek { - fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) - -> Poll> - { - if self.ready_seek { - self.ready_seek = false; - Pin::new(&mut self.inner).poll_seek(cx, pos) - } else { - self.ready_seek = true; - Poll::Pending + impl AsyncSeek for MaybePendingSeek { + fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) + -> Poll> + { + if self.ready_seek { + self.ready_seek = false; + Pin::new(&mut self.inner).poll_seek(cx, pos) + } else { + self.ready_seek = true; + Poll::Pending + } } } -} -#[test] -fn maybe_pending_buf_writer_seek() { // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed, // use `Vec::new` instead of `vec![0; 8]`. let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(vec![0; 8])); diff --git a/tests/io_cursor.rs b/tests/io_cursor.rs index 4f80a75..0a93c83 100644 --- a/tests/io_cursor.rs +++ b/tests/io_cursor.rs @@ -1,11 +1,12 @@ -use assert_matches::assert_matches; -use futures::future::lazy; -use futures::io::{AsyncWrite, Cursor}; -use futures::task::Poll; -use std::pin::Pin; - +#[cfg(all(feature = "std", feature = "executor"))] #[test] fn cursor_asyncwrite_vec() { + use assert_matches::assert_matches; + use futures::future::lazy; + use futures::io::{AsyncWrite, Cursor}; + use futures::task::Poll; + use std::pin::Pin; + let mut cursor = Cursor::new(vec![0; 5]); futures::executor::block_on(lazy(|cx| { assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); @@ -16,8 +17,15 @@ fn cursor_asyncwrite_vec() { assert_eq!(cursor.into_inner(), [1, 2, 3, 4, 5, 6, 6, 7]); } +#[cfg(all(feature = "std", feature = "executor"))] #[test] fn cursor_asyncwrite_box() { + use assert_matches::assert_matches; + use futures::future::lazy; + use futures::io::{AsyncWrite, Cursor}; + use futures::task::Poll; + use std::pin::Pin; + let mut cursor = Cursor::new(vec![0; 5].into_boxed_slice()); futures::executor::block_on(lazy(|cx| { assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); diff --git a/tests/io_lines.rs b/tests/io_lines.rs index 39eafa9..e10edd0 100644 --- a/tests/io_lines.rs +++ b/tests/io_lines.rs @@ -1,19 +1,34 @@ -use futures::executor::block_on; -use futures::future::{Future, FutureExt}; -use futures::stream::{self, StreamExt, TryStreamExt}; -use futures::io::{AsyncBufReadExt, Cursor}; -use futures::task::Poll; -use futures_test::io::AsyncReadTestExt; -use futures_test::task::noop_context; - -macro_rules! block_on_next { - ($expr:expr) => { - block_on($expr.next()).unwrap().unwrap() - }; +#[cfg(any(feature = "std", feature = "executor"))] +mod util { + use futures::future::Future; + + pub fn run(mut f: F) -> F::Output { + use futures_test::task::noop_context; + use futures::task::Poll; + use futures::future::FutureExt; + + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } + } } +#[cfg(feature = "executor")] #[test] fn lines() { + use futures::executor::block_on; + use futures::stream::StreamExt; + use futures::io::{AsyncBufReadExt, Cursor}; + + macro_rules! block_on_next { + ($expr:expr) => { + block_on($expr.next()).unwrap().unwrap() + }; + } + let buf = Cursor::new(&b"12\r"[..]); let mut s = buf.lines(); assert_eq!(block_on_next!(s), "12\r".to_string()); @@ -26,23 +41,21 @@ fn lines() { assert!(block_on(s.next()).is_none()); } -fn run(mut f: F) -> F::Output { - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } -} - -macro_rules! run_next { - ($expr:expr) => { - run($expr.next()).unwrap().unwrap() - }; -} - +#[cfg(feature = "std")] #[test] fn maybe_pending() { + use futures::stream::{self, StreamExt, TryStreamExt}; + use futures::io::AsyncBufReadExt; + use futures_test::io::AsyncReadTestExt; + + use util::run; + + macro_rules! run_next { + ($expr:expr) => { + run($expr.next()).unwrap().unwrap() + }; + } + let buf = stream::iter(vec![&b"12"[..], &b"\r"[..]]) .map(Ok) .into_async_read() diff --git a/tests/io_read.rs b/tests/io_read.rs index f99c4ed..ba68fcc 100644 --- a/tests/io_read.rs +++ b/tests/io_read.rs @@ -1,33 +1,44 @@ -use futures::io::AsyncRead; -use futures_test::task::panic_context; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; +#[cfg(feature = "std")] +mod mock_reader { + use futures::io::AsyncRead; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll}; -struct MockReader { - fun: Box Poll>>, -} + pub struct MockReader { + fun: Box Poll>>, + } -impl MockReader { - pub fn new(fun: impl FnMut(&mut [u8]) -> Poll> + 'static) -> Self { - MockReader { fun: Box::new(fun) } + impl MockReader { + pub fn new(fun: impl FnMut(&mut [u8]) -> Poll> + 'static) -> Self { + MockReader { fun: Box::new(fun) } + } } -} -impl AsyncRead for MockReader { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut [u8] - ) -> Poll> { - (self.get_mut().fun)(buf) + impl AsyncRead for MockReader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8] + ) -> Poll> { + (self.get_mut().fun)(buf) + } } } /// Verifies that the default implementation of `poll_read_vectored` /// calls `poll_read` with an empty slice if no buffers are provided. +#[cfg(feature = "std")] #[test] fn read_vectored_no_buffers() { + use futures::io::AsyncRead; + use futures_test::task::panic_context; + use std::io; + use std::pin::Pin; + use std::task::Poll; + + use mock_reader::MockReader; + let mut reader = MockReader::new(|buf| { assert_eq!(buf, b""); Err(io::ErrorKind::BrokenPipe.into()).into() @@ -42,8 +53,17 @@ fn read_vectored_no_buffers() { /// Verifies that the default implementation of `poll_read_vectored` /// calls `poll_read` with the first non-empty buffer. +#[cfg(feature = "std")] #[test] fn read_vectored_first_non_empty() { + use futures::io::AsyncRead; + use futures_test::task::panic_context; + use std::io; + use std::pin::Pin; + use std::task::Poll; + + use mock_reader::MockReader; + let mut reader = MockReader::new(|buf| { assert_eq!(buf.len(), 4); buf.copy_from_slice(b"four"); diff --git a/tests/io_read_exact.rs b/tests/io_read_exact.rs index 4941773..a772e34 100644 --- a/tests/io_read_exact.rs +++ b/tests/io_read_exact.rs @@ -1,8 +1,9 @@ -use futures::executor::block_on; -use futures::io::AsyncReadExt; - +#[cfg(feature = "executor")] #[test] fn read_exact() { + use futures::executor::block_on; + use futures::io::AsyncReadExt; + let mut reader: &[u8] = &[1, 2, 3, 4, 5]; let mut out = [0u8; 3]; diff --git a/tests/io_read_line.rs b/tests/io_read_line.rs index d1dba5e..ab25f26 100644 --- a/tests/io_read_line.rs +++ b/tests/io_read_line.rs @@ -1,13 +1,9 @@ -use futures::executor::block_on; -use futures::future::{Future, FutureExt}; -use futures::stream::{self, StreamExt, TryStreamExt}; -use futures::io::{AsyncBufReadExt, Cursor}; -use futures::task::Poll; -use futures_test::io::AsyncReadTestExt; -use futures_test::task::noop_context; - +#[cfg(feature = "executor")] #[test] fn read_line() { + use futures::executor::block_on; + use futures::io::{AsyncBufReadExt, Cursor}; + let mut buf = Cursor::new(b"12"); let mut v = String::new(); assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2); @@ -25,17 +21,28 @@ fn read_line() { assert_eq!(v, ""); } -fn run(mut f: F) -> F::Output { - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; +#[cfg(feature = "std")] +#[test] +fn maybe_pending() { + use futures::future::Future; + + fn run(mut f: F) -> F::Output { + use futures::future::FutureExt; + use futures::task::Poll; + use futures_test::task::noop_context; + + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } } } -} -#[test] -fn maybe_pending() { + use futures::stream::{self, StreamExt, TryStreamExt}; + use futures::io::AsyncBufReadExt; + use futures_test::io::AsyncReadTestExt; + let mut buf = b"12".interleave_pending(); let mut v = String::new(); assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2); diff --git a/tests/io_read_to_string.rs b/tests/io_read_to_string.rs index db825af..0a79a22 100644 --- a/tests/io_read_to_string.rs +++ b/tests/io_read_to_string.rs @@ -1,13 +1,9 @@ -use futures::executor::block_on; -use futures::future::{Future, FutureExt}; -use futures::stream::{self, StreamExt, TryStreamExt}; -use futures::io::{AsyncReadExt, Cursor}; -use futures::task::Poll; -use futures_test::io::AsyncReadTestExt; -use futures_test::task::noop_context; - +#[cfg(all(feature = "std", feature = "executor"))] #[test] fn read_to_string() { + use futures::executor::block_on; + use futures::io::{AsyncReadExt, Cursor}; + let mut c = Cursor::new(&b""[..]); let mut v = String::new(); assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0); @@ -23,17 +19,26 @@ fn read_to_string() { assert!(block_on(c.read_to_string(&mut v)).is_err()); } -fn run(mut f: F) -> F::Output { - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } -} - +#[cfg(feature = "std")] #[test] fn interleave_pending() { + use futures::future::Future; + use futures::stream::{self, StreamExt, TryStreamExt}; + use futures::io::AsyncReadExt; + use futures_test::io::AsyncReadTestExt; + + fn run(mut f: F) -> F::Output { + use futures::future::FutureExt; + use futures_test::task::noop_context; + use futures::task::Poll; + + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } + } let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]]) .map(Ok) .into_async_read() diff --git a/tests/io_read_until.rs b/tests/io_read_until.rs index 5152281..1e018b7 100644 --- a/tests/io_read_until.rs +++ b/tests/io_read_until.rs @@ -1,13 +1,9 @@ -use futures::executor::block_on; -use futures::future::{Future, FutureExt}; -use futures::stream::{self, StreamExt, TryStreamExt}; -use futures::io::{AsyncBufReadExt, Cursor}; -use futures::task::Poll; -use futures_test::io::AsyncReadTestExt; -use futures_test::task::noop_context; - +#[cfg(feature = "executor")] #[test] fn read_until() { + use futures::executor::block_on; + use futures::io::{AsyncBufReadExt, Cursor}; + let mut buf = Cursor::new(b"12"); let mut v = Vec::new(); assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2); @@ -25,17 +21,29 @@ fn read_until() { assert_eq!(v, []); } -fn run(mut f: F) -> F::Output { - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } -} +#[cfg(feature = "std")] #[test] fn maybe_pending() { + use futures::future::Future; + + fn run(mut f: F) -> F::Output { + use futures::future::FutureExt; + use futures_test::task::noop_context; + use futures::task::Poll; + + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } + } + + use futures::stream::{self, StreamExt, TryStreamExt}; + use futures::io::AsyncBufReadExt; + use futures_test::io::AsyncReadTestExt; + let mut buf = b"12".interleave_pending(); let mut v = Vec::new(); assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2); diff --git a/tests/io_window.rs b/tests/io_window.rs index 98df69c..8cc41a3 100644 --- a/tests/io_window.rs +++ b/tests/io_window.rs @@ -1,3 +1,4 @@ +#![cfg(feature = "std")] use futures::io::Window; #[test] diff --git a/tests/io_write.rs b/tests/io_write.rs index b963444..af0d5c3 100644 --- a/tests/io_write.rs +++ b/tests/io_write.rs @@ -1,41 +1,52 @@ -use futures::io::AsyncWrite; -use futures_test::task::panic_context; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; +#[cfg(feature = "std")] +mod mock_writer { + use futures::io::AsyncWrite; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll}; -struct MockWriter { - fun: Box Poll>>, -} - -impl MockWriter { - pub fn new(fun: impl FnMut(&[u8]) -> Poll> + 'static) -> Self { - MockWriter { fun: Box::new(fun) } + pub struct MockWriter { + fun: Box Poll>>, } -} -impl AsyncWrite for MockWriter { - fn poll_write( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - (self.get_mut().fun)(buf) + impl MockWriter { + pub fn new(fun: impl FnMut(&[u8]) -> Poll> + 'static) -> Self { + MockWriter { fun: Box::new(fun) } + } } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - panic!() - } + impl AsyncWrite for MockWriter { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + (self.get_mut().fun)(buf) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + panic!() + } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - panic!() + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + panic!() + } } } /// Verifies that the default implementation of `poll_write_vectored` /// calls `poll_write` with an empty slice if no buffers are provided. +#[cfg(feature = "std")] #[test] fn write_vectored_no_buffers() { + use futures::io::AsyncWrite; + use futures_test::task::panic_context; + use std::io; + use std::pin::Pin; + use std::task::Poll; + + use mock_writer::MockWriter; + let mut writer = MockWriter::new(|buf| { assert_eq!(buf, b""); Err(io::ErrorKind::BrokenPipe.into()).into() @@ -50,8 +61,17 @@ fn write_vectored_no_buffers() { /// Verifies that the default implementation of `poll_write_vectored` /// calls `poll_write` with the first non-empty buffer. +#[cfg(feature = "std")] #[test] fn write_vectored_first_non_empty() { + use futures::io::AsyncWrite; + use futures_test::task::panic_context; + use std::io; + use std::pin::Pin; + use std::task::Poll; + + use mock_writer::MockWriter; + let mut writer = MockWriter::new(|buf| { assert_eq!(buf, b"four"); Poll::Ready(Ok(4)) diff --git a/tests/join_all.rs b/tests/join_all.rs index 63967bf..0d8fbf2 100644 --- a/tests/join_all.rs +++ b/tests/join_all.rs @@ -1,29 +1,38 @@ -use futures_util::future::*; -use std::future::Future; -use futures::executor::block_on; -use std::fmt::Debug; - -fn assert_done(actual_fut: F, expected: T) -where - T: PartialEq + Debug, - F: FnOnce() -> Box + Unpin>, -{ - let output = block_on(actual_fut()); - assert_eq!(output, expected); +#[cfg(feature = "executor")] +mod util { + use std::future::Future; + use std::fmt::Debug; + + pub fn assert_done(actual_fut: F, expected: T) + where + T: PartialEq + Debug, + F: FnOnce() -> Box + Unpin>, + { + use futures::executor::block_on; + + let output = block_on(actual_fut()); + assert_eq!(output, expected); + } } +#[cfg(feature = "executor")] #[test] fn collect_collects() { - assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]); - assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]); + use futures_util::future::{join_all,ready}; + + util::assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]); + util::assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]); // REVIEW: should this be implemented? // assert_done(|| Box::new(join_all(Vec::::new())), vec![]); // TODO: needs more tests } +#[cfg(feature = "executor")] #[test] fn join_all_iter_lifetime() { + use futures_util::future::{join_all,ready}; + use std::future::Future; // In futures-rs version 0.1, this function would fail to typecheck due to an overly // conservative type parameterization of `JoinAll`. fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box> + Unpin> { @@ -31,12 +40,15 @@ fn join_all_iter_lifetime() { Box::new(join_all(iter)) } - assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3 as usize, 0, 1]); + util::assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3 as usize, 0, 1]); } +#[cfg(feature = "executor")] #[test] fn join_all_from_iter() { - assert_done( + use futures_util::future::{JoinAll,ready}; + + util::assert_done( || Box::new(vec![ready(1), ready(2)].into_iter().collect::>()), vec![1, 2], ) diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs index 111f65a..e6a609b 100644 --- a/tests/macro_comma_support.rs +++ b/tests/macro_comma_support.rs @@ -1,29 +1,41 @@ -#[macro_use] -extern crate futures; - -use futures::{ - executor::block_on, - future::{self, FutureExt}, - task::Poll, -}; - +#[cfg(feature = "executor")] #[test] fn ready() { + use futures::{ + executor::block_on, + future, + task::Poll, + ready, + }; + block_on(future::poll_fn(|_| { ready!(Poll::Ready(()),); Poll::Ready(()) })) } +#[cfg(all(feature = "executor", feature = "async-await"))] #[test] fn poll() { + use futures::{ + executor::block_on, + future::FutureExt, + poll, + }; + block_on(async { let _ = poll!(async {}.boxed(),); }) } +#[cfg(all(feature = "executor", feature = "async-await"))] #[test] fn join() { + use futures::{ + executor::block_on, + join + }; + block_on(async { let future1 = async { 1 }; let future2 = async { 2 }; @@ -31,8 +43,15 @@ fn join() { }) } +#[cfg(all(feature = "executor", feature = "async-await"))] #[test] fn try_join() { + use futures::{ + executor::block_on, + future::FutureExt, + try_join, + }; + block_on(async { let future1 = async { 1 }.never_error(); let future2 = async { 2 }.never_error(); diff --git a/tests/mutex.rs b/tests/mutex.rs index bad53a9..7ee9f41 100644 --- a/tests/mutex.rs +++ b/tests/mutex.rs @@ -1,23 +1,24 @@ -use futures::channel::mpsc; -use futures::executor::block_on; -use futures::future::{ready, FutureExt}; -use futures::lock::Mutex; -use futures::stream::StreamExt; -use futures::task::{Context, SpawnExt}; -use futures_test::future::FutureTestExt; -use futures_test::task::{new_count_waker, panic_context}; -use std::sync::Arc; - +#[cfg(all(feature = "alloc", feature = "std"))] #[test] fn mutex_acquire_uncontested() { + use futures::future::FutureExt; + use futures::lock::Mutex; + use futures_test::task::panic_context; + let mutex = Mutex::new(()); for _ in 0..10 { assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready()); } } +#[cfg(all(feature = "alloc", feature = "std"))] #[test] fn mutex_wakes_waiters() { + use futures::future::FutureExt; + use futures::lock::Mutex; + use futures::task::Context; + use futures_test::task::{new_count_waker, panic_context}; + let mutex = Mutex::new(()); let (waker, counter) = new_count_waker(); let lock = mutex.lock().poll_unpin(&mut panic_context()); @@ -34,8 +35,18 @@ fn mutex_wakes_waiters() { assert!(waiter.poll_unpin(&mut panic_context()).is_ready()); } +#[cfg(feature = "thread-pool")] #[test] fn mutex_contested() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future::ready; + use futures::lock::Mutex; + use futures::stream::StreamExt; + use futures::task::SpawnExt; + use futures_test::future::FutureTestExt; + use std::sync::Arc; + let (tx, mut rx) = mpsc::unbounded(); let pool = futures::executor::ThreadPool::builder() .pool_size(16) diff --git a/tests/object_safety.rs b/tests/object_safety.rs index 30c892f..b49ee88 100644 --- a/tests/object_safety.rs +++ b/tests/object_safety.rs @@ -28,6 +28,7 @@ fn sink() { assert_is_object_safe::<&dyn Sink<(), Error = ()>>(); } +#[cfg(feature = "std")] // futures::io #[test] fn io() { // `AsyncReadExt`, `AsyncWriteExt`, `AsyncSeekExt` and `AsyncBufReadExt` are not object safe. diff --git a/tests/oneshot.rs b/tests/oneshot.rs index 58951ec..302160b 100644 --- a/tests/oneshot.rs +++ b/tests/oneshot.rs @@ -1,11 +1,12 @@ -use futures::channel::oneshot; -use futures::future::{FutureExt, TryFutureExt}; -use futures_test::future::FutureTestExt; -use std::sync::mpsc; -use std::thread; - +#[cfg(feature = "alloc")] // channel #[test] fn oneshot_send1() { + use futures::channel::oneshot; + use futures::future::TryFutureExt; + use futures_test::future::FutureTestExt; + use std::sync::mpsc; + use std::thread; + let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = mpsc::channel(); @@ -15,8 +16,15 @@ fn oneshot_send1() { t.join().unwrap(); } +#[cfg(feature = "alloc")] // channel #[test] fn oneshot_send2() { + use futures::channel::oneshot; + use futures::future::TryFutureExt; + use futures_test::future::FutureTestExt; + use std::sync::mpsc; + use std::thread; + let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = mpsc::channel(); @@ -25,8 +33,15 @@ fn oneshot_send2() { assert_eq!(1, rx2.recv().unwrap()); } +#[cfg(feature = "alloc")] // channel #[test] fn oneshot_send3() { + use futures::channel::oneshot; + use futures::future::TryFutureExt; + use futures_test::future::FutureTestExt; + use std::sync::mpsc; + use std::thread; + let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = mpsc::channel(); @@ -35,8 +50,14 @@ fn oneshot_send3() { assert_eq!(1, rx2.recv().unwrap()); } +#[cfg(feature = "alloc")] // channel #[test] fn oneshot_drop_tx1() { + use futures::channel::oneshot; + use futures::future::FutureExt; + use futures_test::future::FutureTestExt; + use std::sync::mpsc; + let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = mpsc::channel(); @@ -46,8 +67,15 @@ fn oneshot_drop_tx1() { assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap()); } +#[cfg(feature = "alloc")] // channel #[test] fn oneshot_drop_tx2() { + use futures::channel::oneshot; + use futures::future::FutureExt; + use futures_test::future::FutureTestExt; + use std::sync::mpsc; + use std::thread; + let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = mpsc::channel(); @@ -58,8 +86,11 @@ fn oneshot_drop_tx2() { assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap()); } +#[cfg(feature = "alloc")] // channel #[test] fn oneshot_drop_rx() { + use futures::channel::oneshot; + let (tx, rx) = oneshot::channel::(); drop(rx); assert_eq!(Err(2), tx.send(2)); diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs index 15a0bef..be6ccc3 100644 --- a/tests/ready_queue.rs +++ b/tests/ready_queue.rs @@ -1,18 +1,20 @@ -use futures::channel::oneshot; -use futures::executor::{block_on, block_on_stream}; -use futures::future; -use futures::stream::{FuturesUnordered, StreamExt}; -use futures::task::Poll; -use futures_test::task::noop_context; -use std::panic::{self, AssertUnwindSafe}; -use std::sync::{Arc, Barrier}; -use std::thread; - -trait AssertSendSync: Send + Sync {} -impl AssertSendSync for FuturesUnordered<()> {} +#[cfg(feature = "alloc")] // FuturesUnordered +mod assert_send_sync { + use futures::stream::FuturesUnordered; + pub trait AssertSendSync: Send + Sync {} + impl AssertSendSync for FuturesUnordered<()> {} +} + +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: #[test] fn basic_usage() { + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future; + use futures::stream::{FuturesUnordered, StreamExt}; + use futures::task::Poll; + block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); @@ -39,8 +41,15 @@ fn basic_usage() { })); } +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: #[test] fn resolving_errors() { + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future; + use futures::stream::{FuturesUnordered, StreamExt}; + use futures::task::Poll; + block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); @@ -67,8 +76,15 @@ fn resolving_errors() { })); } +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: #[test] fn dropping_ready_queue() { + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future; + use futures::stream::FuturesUnordered; + use futures_test::task::noop_context; + block_on(future::lazy(move |_| { let queue = FuturesUnordered::new(); let (mut tx1, rx1) = oneshot::channel::<()>(); @@ -94,8 +110,15 @@ fn dropping_ready_queue() { })); } +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: #[test] fn stress() { + use futures::channel::oneshot; + use futures::executor::block_on_stream; + use futures::stream::FuturesUnordered; + use std::sync::{Arc, Barrier}; + use std::thread; + const ITER: usize = 300; for i in 0..ITER { @@ -137,8 +160,15 @@ fn stress() { } } +#[cfg(feature = "executor")] // executor #[test] fn panicking_future_dropped() { + use futures::executor::block_on; + use futures::future; + use futures::stream::{FuturesUnordered, StreamExt}; + use futures::task::Poll; + use std::panic::{self, AssertUnwindSafe}; + block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); queue.push(future::poll_fn(|_| -> Poll> { panic!() })); diff --git a/tests/recurse.rs b/tests/recurse.rs index 2920a41..d3f4124 100644 --- a/tests/recurse.rs +++ b/tests/recurse.rs @@ -1,10 +1,11 @@ -use futures::executor::block_on; -use futures::future::{self, FutureExt, BoxFuture}; -use std::sync::mpsc; -use std::thread; - +#[cfg(feature = "executor")] // executor:: #[test] fn lots() { + use futures::executor::block_on; + use futures::future::{self, FutureExt, BoxFuture}; + use std::sync::mpsc; + use std::thread; + fn do_it(input: (i32, i32)) -> BoxFuture<'static, i32> { let (n, x) = input; if n == 0 { diff --git a/tests/select_all.rs b/tests/select_all.rs index aad977d..9a6d736 100644 --- a/tests/select_all.rs +++ b/tests/select_all.rs @@ -1,9 +1,10 @@ -use futures::executor::block_on; -use futures::future::{ready, select_all}; -use std::collections::HashSet; - +#[cfg(feature = "executor")] // executor:: #[test] fn smoke() { + use futures::executor::block_on; + use futures::future::{ready, select_all}; + use std::collections::HashSet; + let v = vec![ ready(1), ready(2), diff --git a/tests/select_ok.rs b/tests/select_ok.rs index db88a95..dd3703e 100644 --- a/tests/select_ok.rs +++ b/tests/select_ok.rs @@ -1,8 +1,9 @@ -use futures::executor::block_on; -use futures::future::{err, ok, select_ok}; - +#[cfg(feature = "executor")] // executor:: #[test] fn ignore_err() { + use futures::executor::block_on; + use futures::future::{err, ok, select_ok}; + let v = vec![ err(1), err(2), @@ -21,8 +22,12 @@ fn ignore_err() { assert!(v.is_empty()); } +#[cfg(feature = "executor")] // executor:: #[test] fn last_err() { + use futures::executor::block_on; + use futures::future::{err, ok, select_ok}; + let v = vec![ ok(1), err(2), diff --git a/tests/shared.rs b/tests/shared.rs index 8402bfe..21e80fe 100644 --- a/tests/shared.rs +++ b/tests/shared.rs @@ -1,12 +1,23 @@ -use futures::channel::oneshot; -use futures::executor::{block_on, LocalPool}; -use futures::future::{self, FutureExt, TryFutureExt, LocalFutureObj}; -use futures::task::LocalSpawn; -use std::cell::{Cell, RefCell}; -use std::rc::Rc; -use std::thread; +mod count_clone { + use std::cell::Cell; + use std::rc::Rc; + pub struct CountClone(pub Rc>); + + impl Clone for CountClone { + fn clone(&self) -> Self { + self.0.set(self.0.get() + 1); + CountClone(self.0.clone()) + } + } +} + +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) { + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future::FutureExt; + use std::thread; let (tx, rx) = oneshot::channel::(); let f = rx.shared(); let join_handles = (0..threads_number) @@ -26,23 +37,32 @@ fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) { } } +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: #[test] fn one_thread() { send_shared_oneshot_and_wait_on_multiple_threads(1); } +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: #[test] fn two_threads() { send_shared_oneshot_and_wait_on_multiple_threads(2); } +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: #[test] fn many_threads() { send_shared_oneshot_and_wait_on_multiple_threads(1000); } +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: #[test] fn drop_on_one_task_ok() { + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future::{self, FutureExt, TryFutureExt}; + use std::thread; + let (tx, rx) = oneshot::channel::(); let f1 = rx.shared(); let f2 = f1.clone(); @@ -69,15 +89,22 @@ fn drop_on_one_task_ok() { t2.join().unwrap(); } +#[cfg(feature = "executor")] // executor:: #[test] fn drop_in_poll() { + use futures::executor::block_on; + use futures::future::{self, FutureExt, LocalFutureObj}; + use std::cell::RefCell; + use std::rc::Rc; + let slot1 = Rc::new(RefCell::new(None)); let slot2 = slot1.clone(); let future1 = future::lazy(move |_| { slot2.replace(None); // Drop future 1 - }).shared(); + }) + .shared(); let future2 = LocalFutureObj::new(Box::new(future1.clone())); slot1.replace(Some(future2)); @@ -85,8 +112,14 @@ fn drop_in_poll() { assert_eq!(block_on(future1), 1); } +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: #[test] fn peek() { + use futures::channel::oneshot; + use futures::executor::LocalPool; + use futures::future::{FutureExt, LocalFutureObj}; + use futures::task::LocalSpawn; + let mut local_pool = LocalPool::new(); let spawn = &mut local_pool.spawner(); @@ -108,24 +141,26 @@ fn peek() { } // Once the Shared has been polled, the value is peekable on the clone. - spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap(); + spawn + .spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))) + .unwrap(); local_pool.run(); for _ in 0..2 { assert_eq!(*f2.peek().unwrap(), Ok(42)); } } -struct CountClone(Rc>); - -impl Clone for CountClone { - fn clone(&self) -> Self { - self.0.set(self.0.get() + 1); - CountClone(self.0.clone()) - } -} - +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: #[test] fn dont_clone_in_single_owner_shared_future() { + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future::FutureExt; + use std::cell::Cell; + use std::rc::Rc; + + use count_clone::CountClone; + let counter = CountClone(Rc::new(Cell::new(0))); let (tx, rx) = oneshot::channel(); @@ -136,8 +171,17 @@ fn dont_clone_in_single_owner_shared_future() { assert_eq!(block_on(rx).unwrap().0.get(), 0); } +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: #[test] fn dont_do_unnecessary_clones_on_output() { + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future::FutureExt; + use std::cell::Cell; + use std::rc::Rc; + + use count_clone::CountClone; + let counter = CountClone(Rc::new(Cell::new(0))); let (tx, rx) = oneshot::channel(); @@ -149,3 +193,30 @@ fn dont_do_unnecessary_clones_on_output() { assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2); assert_eq!(block_on(rx).unwrap().0.get(), 2); } + +#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor:: +#[test] +fn shared_future_that_wakes_itself_until_pending_is_returned() { + use futures::executor::block_on; + use futures::future::FutureExt; + use std::cell::Cell; + use std::task::Poll; + + let proceed = Cell::new(false); + let fut = futures::future::poll_fn(|cx| { + if proceed.get() { + Poll::Ready(()) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + .shared(); + + // The join future can only complete if the second future gets a chance to run after the first + // has returned pending + assert_eq!( + block_on(futures::future::join(fut, async { proceed.set(true) })), + ((), ()) + ); +} diff --git a/tests/sink.rs b/tests/sink.rs index f967e1b..f6ce28c 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -1,43 +1,258 @@ -use futures::channel::{mpsc, oneshot}; -use futures::executor::block_on; -use futures::future::{self, Future, FutureExt, TryFutureExt}; -use futures::never::Never; -use futures::ready; -use futures::sink::{Sink, SinkErrInto, SinkExt}; -use futures::stream::{self, Stream, StreamExt}; -use futures::task::{self, ArcWake, Context, Poll, Waker}; -use futures_test::task::panic_context; -use std::cell::{Cell, RefCell}; -use std::collections::VecDeque; -use std::fmt; -use std::mem; -use std::pin::Pin; -use std::rc::Rc; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; - -fn sassert_next(s: &mut S, item: S::Item) -where - S: Stream + Unpin, - S::Item: Eq + fmt::Debug, -{ - match s.poll_next_unpin(&mut panic_context()) { - Poll::Ready(None) => panic!("stream is at its end"), - Poll::Ready(Some(e)) => assert_eq!(e, item), - Poll::Pending => panic!("stream wasn't ready"), +#[allow(dead_code)] +mod sassert_next { + use futures::stream::{Stream, StreamExt}; + use futures::task::Poll; + use futures_test::task::panic_context; + use std::fmt; + + pub fn sassert_next(s: &mut S, item: S::Item) + where + S: Stream + Unpin, + S::Item: Eq + fmt::Debug, + { + match s.poll_next_unpin(&mut panic_context()) { + Poll::Ready(None) => panic!("stream is at its end"), + Poll::Ready(Some(e)) => assert_eq!(e, item), + Poll::Pending => panic!("stream wasn't ready"), + } + } +} + +#[allow(dead_code)] +mod unwrap { + use futures::task::Poll; + use std::fmt; + + pub fn unwrap(x: Poll>) -> T { + match x { + Poll::Ready(Ok(x)) => x, + Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), + Poll::Pending => panic!("Poll::Pending"), + } + } +} + +#[allow(dead_code)] +#[cfg(feature = "alloc")] // ArcWake +mod flag_cx { + use futures::task::{self, ArcWake, Context}; + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; + + // An Unpark struct that records unpark events for inspection + pub struct Flag(AtomicBool); + + impl Flag { + pub fn new() -> Arc { + Arc::new(Self(AtomicBool::new(false))) + } + + pub fn take(&self) -> bool { + self.0.swap(false, Ordering::SeqCst) + } + + pub fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) + } + } + + impl ArcWake for Flag { + fn wake_by_ref(arc_self: &Arc) { + arc_self.set(true) + } + } + + pub fn flag_cx(f: F) -> R + where + F: FnOnce(Arc, &mut Context<'_>) -> R, + { + let flag = Flag::new(); + let waker = task::waker_ref(&flag); + let cx = &mut Context::from_waker(&waker); + f(flag.clone(), cx) + } +} + +#[allow(dead_code)] +mod start_send_fut { + use futures::future::Future; + use futures::ready; + use futures::sink::Sink; + use futures::task::{Context, Poll}; + use std::pin::Pin; + + // Sends a value on an i32 channel sink + pub struct StartSendFut + Unpin, Item: Unpin>(Option, Option); + + impl + Unpin, Item: Unpin> StartSendFut { + pub fn new(sink: S, item: Item) -> Self { + Self(Some(sink), Some(item)) + } + } + + impl + Unpin, Item: Unpin> Future for StartSendFut { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self(inner, item) = self.get_mut(); + { + let mut inner = inner.as_mut().unwrap(); + ready!(Pin::new(&mut inner).poll_ready(cx))?; + Pin::new(&mut inner).start_send(item.take().unwrap())?; + } + Poll::Ready(Ok(inner.take().unwrap())) + } + } +} + +#[allow(dead_code)] +mod manual_flush { + use futures::sink::Sink; + use futures::task::{Context, Poll, Waker}; + use std::mem; + use std::pin::Pin; + + // Immediately accepts all requests to start pushing, but completion is managed + // by manually flushing + pub struct ManualFlush { + data: Vec, + waiting_tasks: Vec, + } + + impl Sink> for ManualFlush { + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, item: Option) -> Result<(), Self::Error> { + if let Some(item) = item { + self.data.push(item); + } else { + self.force_flush(); + } + Ok(()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.data.is_empty() { + Poll::Ready(Ok(())) + } else { + self.waiting_tasks.push(cx.waker().clone()); + Poll::Pending + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } + } + + impl ManualFlush { + pub fn new() -> Self { + Self { + data: Vec::new(), + waiting_tasks: Vec::new(), + } + } + + pub fn force_flush(&mut self) -> Vec { + for task in self.waiting_tasks.drain(..) { + task.wake() + } + mem::replace(&mut self.data, Vec::new()) + } } } -fn unwrap(x: Poll>) -> T { - match x { - Poll::Ready(Ok(x)) => x, - Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), - Poll::Pending => panic!("Poll::Pending"), +#[allow(dead_code)] +mod allowance { + use futures::sink::Sink; + use futures::task::{Context, Poll, Waker}; + use std::cell::{Cell, RefCell}; + use std::pin::Pin; + use std::rc::Rc; + + pub struct ManualAllow { + pub data: Vec, + allow: Rc, + } + + pub struct Allow { + flag: Cell, + tasks: RefCell>, + } + + impl Allow { + pub fn new() -> Self { + Self { + flag: Cell::new(false), + tasks: RefCell::new(Vec::new()), + } + } + + pub fn check(&self, cx: &mut Context<'_>) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(cx.waker().clone()); + false + } + } + + pub fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.wake(); + } + } + } + + impl Sink for ManualAllow { + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.allow.check(cx) { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.data.push(item); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + } + + pub fn manual_allow() -> (ManualAllow, Rc) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { + data: Vec::new(), + allow: allow.clone(), + }; + (manual_allow, allow) } } + +#[cfg(feature = "alloc")] // futures_sink::Sink satisfying for .left/right_sink #[test] fn either_sink() { + use futures::sink::{Sink, SinkExt}; + use std::collections::VecDeque; + use std::pin::Pin; + let mut s = if true { Vec::::new().left_sink() } else { @@ -47,8 +262,13 @@ fn either_sink() { Pin::new(&mut s).start_send(0).unwrap(); } +#[cfg(feature = "executor")] // executor:: #[test] fn vec_sink() { + use futures::executor::block_on; + use futures::sink::{Sink, SinkExt}; + use std::pin::Pin; + let mut v = Vec::new(); Pin::new(&mut v).start_send(0).unwrap(); Pin::new(&mut v).start_send(1).unwrap(); @@ -57,8 +277,13 @@ fn vec_sink() { assert_eq!(v, vec![0, 1]); } +#[cfg(feature = "alloc")] // futures_sink::Sink satisfying for .start_send() #[test] fn vecdeque_sink() { + use futures::sink::Sink; + use std::collections::VecDeque; + use std::pin::Pin; + let mut deque = VecDeque::new(); Pin::new(&mut deque).start_send(2).unwrap(); Pin::new(&mut deque).start_send(3).unwrap(); @@ -68,8 +293,12 @@ fn vecdeque_sink() { assert_eq!(deque.pop_front(), None); } +#[cfg(feature = "executor")] // executor:: #[test] fn send() { + use futures::executor::block_on; + use futures::sink::SinkExt; + let mut v = Vec::new(); block_on(v.send(0)).unwrap(); @@ -82,8 +311,13 @@ fn send() { assert_eq!(v, vec![0, 1, 2]); } +#[cfg(feature = "executor")] // executor:: #[test] fn send_all() { + use futures::executor::block_on; + use futures::sink::SinkExt; + use futures::stream::{self, StreamExt}; + let mut v = Vec::new(); block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap(); @@ -96,66 +330,20 @@ fn send_all() { assert_eq!(v, vec![0, 1, 2, 3, 4, 5]); } -// An Unpark struct that records unpark events for inspection -struct Flag(AtomicBool); - -impl Flag { - fn new() -> Arc { - Arc::new(Self(AtomicBool::new(false))) - } - - fn take(&self) -> bool { - self.0.swap(false, Ordering::SeqCst) - } - - fn set(&self, v: bool) { - self.0.store(v, Ordering::SeqCst) - } -} - -impl ArcWake for Flag { - fn wake_by_ref(arc_self: &Arc) { - arc_self.set(true) - } -} - -fn flag_cx(f: F) -> R -where - F: FnOnce(Arc, &mut Context<'_>) -> R, -{ - let flag = Flag::new(); - let waker = task::waker_ref(&flag); - let cx = &mut Context::from_waker(&waker); - f(flag.clone(), cx) -} - -// Sends a value on an i32 channel sink -struct StartSendFut + Unpin, Item: Unpin>(Option, Option); - -impl + Unpin, Item: Unpin> StartSendFut { - fn new(sink: S, item: Item) -> Self { - Self(Some(sink), Some(item)) - } -} - -impl + Unpin, Item: Unpin> Future for StartSendFut { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self(inner, item) = self.get_mut(); - { - let mut inner = inner.as_mut().unwrap(); - ready!(Pin::new(&mut inner).poll_ready(cx))?; - Pin::new(&mut inner).start_send(item.take().unwrap())?; - } - Poll::Ready(Ok(inner.take().unwrap())) - } -} - // Test that `start_send` on an `mpsc` channel does indeed block when the // channel is full +#[cfg(feature = "executor")] // executor:: #[test] fn mpsc_blocking_start_send() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future::{self, FutureExt}; + + use start_send_fut::StartSendFut; + use flag_cx::flag_cx; + use sassert_next::sassert_next; + use unwrap::unwrap; + let (mut tx, mut rx) = mpsc::channel::(0); block_on(future::lazy(|_| { @@ -177,8 +365,20 @@ fn mpsc_blocking_start_send() { // test `flush` by using `with` to make the first insertion into a sink block // until a oneshot is completed +#[cfg(feature = "executor")] // executor:: #[test] fn with_flush() { + use futures::channel::oneshot; + use futures::executor::block_on; + use futures::future::{self, FutureExt, TryFutureExt}; + use futures::never::Never; + use futures::sink::{Sink, SinkExt}; + use std::mem; + use std::pin::Pin; + + use flag_cx::flag_cx; + use unwrap::unwrap; + let (tx, rx) = oneshot::channel(); let mut block = rx.boxed(); let mut sink = Vec::new().with(|elem| { @@ -203,8 +403,14 @@ fn with_flush() { } // test simple use of with to change data +#[cfg(feature = "executor")] // executor:: #[test] fn with_as_map() { + use futures::executor::block_on; + use futures::future; + use futures::never::Never; + use futures::sink::SinkExt; + let mut sink = Vec::new().with(|item| future::ok::(item * 2)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -213,8 +419,13 @@ fn with_as_map() { } // test simple use of with_flat_map +#[cfg(feature = "executor")] // executor:: #[test] fn with_flat_map() { + use futures::executor::block_on; + use futures::sink::SinkExt; + use futures::stream::{self, StreamExt}; + let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -225,8 +436,19 @@ fn with_flat_map() { // Check that `with` propagates `poll_ready` to the inner sink. // Regression test for the issue #1834. +#[cfg(feature = "executor")] // executor:: #[test] fn with_propagates_poll_ready() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future; + use futures::sink::{Sink, SinkExt}; + use futures::task::Poll; + use std::pin::Pin; + + use flag_cx::flag_cx; + use sassert_next::sassert_next; + let (tx, mut rx) = mpsc::channel::(0); let mut tx = tx.with(|item: i32| future::ok::(item + 10)); @@ -249,63 +471,19 @@ fn with_propagates_poll_ready() { })); } -// Immediately accepts all requests to start pushing, but completion is managed -// by manually flushing -struct ManualFlush { - data: Vec, - waiting_tasks: Vec, -} - -impl Sink> for ManualFlush { - type Error = (); - - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn start_send(mut self: Pin<&mut Self>, item: Option) -> Result<(), Self::Error> { - if let Some(item) = item { - self.data.push(item); - } else { - self.force_flush(); - } - Ok(()) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.data.is_empty() { - Poll::Ready(Ok(())) - } else { - self.waiting_tasks.push(cx.waker().clone()); - Poll::Pending - } - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_flush(cx) - } -} - -impl ManualFlush { - fn new() -> Self { - Self { - data: Vec::new(), - waiting_tasks: Vec::new(), - } - } - - fn force_flush(&mut self) -> Vec { - for task in self.waiting_tasks.drain(..) { - task.wake() - } - mem::replace(&mut self.data, Vec::new()) - } -} - // test that the `with` sink doesn't require the underlying sink to flush, // but doesn't claim to be flushed until the underlying sink is +#[cfg(feature = "alloc")] // flag_cx #[test] fn with_flush_propagate() { + use futures::future::{self, FutureExt}; + use futures::sink::{Sink, SinkExt}; + use std::pin::Pin; + + use manual_flush::ManualFlush; + use flag_cx::flag_cx; + use unwrap::unwrap; + let mut sink = ManualFlush::new().with(future::ok::, ()>); flag_cx(|flag, cx| { unwrap(Pin::new(&mut sink).poll_ready(cx)); @@ -325,8 +503,12 @@ fn with_flush_propagate() { } // test that a buffer is a no-nop around a sink that always accepts sends +#[cfg(feature = "executor")] // executor:: #[test] fn buffer_noop() { + use futures::executor::block_on; + use futures::sink::SinkExt; + let mut sink = Vec::new().buffer(0); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -338,80 +520,21 @@ fn buffer_noop() { assert_eq!(sink.get_ref(), &[0, 1]); } -struct ManualAllow { - data: Vec, - allow: Rc, -} - -struct Allow { - flag: Cell, - tasks: RefCell>, -} - -impl Allow { - fn new() -> Self { - Self { - flag: Cell::new(false), - tasks: RefCell::new(Vec::new()), - } - } - - fn check(&self, cx: &mut Context<'_>) -> bool { - if self.flag.get() { - true - } else { - self.tasks.borrow_mut().push(cx.waker().clone()); - false - } - } - - fn start(&self) { - self.flag.set(true); - let mut tasks = self.tasks.borrow_mut(); - for task in tasks.drain(..) { - task.wake(); - } - } -} - -impl Sink for ManualAllow { - type Error = (); - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.allow.check(cx) { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.data.push(item); - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } -} - -fn manual_allow() -> (ManualAllow, Rc) { - let allow = Rc::new(Allow::new()); - let manual_allow = ManualAllow { - data: Vec::new(), - allow: allow.clone(), - }; - (manual_allow, allow) -} - // test basic buffer functionality, including both filling up to capacity, // and writing out when the underlying sink is ready +#[cfg(feature = "executor")] // executor:: +#[cfg(feature = "alloc")] // flag_cx #[test] fn buffer() { + use futures::executor::block_on; + use futures::future::FutureExt; + use futures::sink::SinkExt; + + use start_send_fut::StartSendFut; + use flag_cx::flag_cx; + use unwrap::unwrap; + use allowance::manual_allow; + let (sink, allow) = manual_allow::(); let sink = sink.buffer(2); @@ -429,8 +552,13 @@ fn buffer() { }) } +#[cfg(feature = "executor")] // executor:: #[test] fn fanout_smoke() { + use futures::executor::block_on; + use futures::sink::SinkExt; + use futures::stream::{self, StreamExt}; + let sink1 = Vec::new(); let sink2 = Vec::new(); let mut sink = sink1.fanout(sink2); @@ -440,8 +568,20 @@ fn fanout_smoke() { assert_eq!(sink2, vec![1, 2, 3]); } +#[cfg(feature = "executor")] // executor:: +#[cfg(feature = "alloc")] // flag_cx #[test] fn fanout_backpressure() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future::FutureExt; + use futures::sink::SinkExt; + use futures::stream::StreamExt; + + use start_send_fut::StartSendFut; + use flag_cx::flag_cx; + use unwrap::unwrap; + let (left_send, mut left_recv) = mpsc::channel(0); let (right_send, mut right_recv) = mpsc::channel(0); let sink = left_send.fanout(right_send); @@ -472,8 +612,15 @@ fn fanout_backpressure() { }) } +#[cfg(all(feature = "alloc", feature = "std"))] // channel::mpsc #[test] fn sink_map_err() { + use futures::channel::mpsc; + use futures::sink::{Sink, SinkExt}; + use futures::task::Poll; + use futures_test::task::panic_context; + use std::pin::Pin; + { let cx = &mut panic_context(); let (tx, _rx) = mpsc::channel(1); @@ -489,17 +636,24 @@ fn sink_map_err() { ); } -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -struct ErrIntoTest; - -impl From for ErrIntoTest { - fn from(_: mpsc::SendError) -> Self { - Self - } -} - +#[cfg(all(feature = "alloc", feature = "std"))] // channel::mpsc #[test] fn err_into() { + use futures::channel::mpsc; + use futures::sink::{Sink, SinkErrInto, SinkExt}; + use futures::task::Poll; + use futures_test::task::panic_context; + use std::pin::Pin; + + #[derive(Copy, Clone, Debug, PartialEq, Eq)] + pub struct ErrIntoTest; + + impl From for ErrIntoTest { + fn from(_: mpsc::SendError) -> Self { + Self + } + } + { let cx = &mut panic_context(); let (tx, _rx) = mpsc::channel(1); diff --git a/tests/sink_fanout.rs b/tests/sink_fanout.rs index e57b2d8..62f32f2 100644 --- a/tests/sink_fanout.rs +++ b/tests/sink_fanout.rs @@ -1,11 +1,12 @@ -use futures::channel::mpsc; -use futures::executor::block_on; -use futures::future::join3; -use futures::sink::SinkExt; -use futures::stream::{self, StreamExt}; - +#[cfg(all(feature = "alloc", feature="std", feature="executor"))] // channel::mpsc, executor:: #[test] fn it_works() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future::join3; + use futures::sink::SinkExt; + use futures::stream::{self, StreamExt}; + let (tx1, rx1) = mpsc::channel(1); let (tx2, rx2) = mpsc::channel(2); let tx = tx1.fanout(tx2).sink_map_err(|_| ()); diff --git a/tests/split.rs b/tests/split.rs index 9f4f1a0..140cf3c 100644 --- a/tests/split.rs +++ b/tests/split.rs @@ -1,65 +1,66 @@ -use futures::executor::block_on; -use futures::sink::{Sink, SinkExt}; -use futures::stream::{self, Stream, StreamExt}; -use futures::task::{Context, Poll}; -use pin_utils::unsafe_pinned; -use std::pin::Pin; +#[cfg(feature = "executor")] // executor:: +#[test] +fn test_split() { + use futures::executor::block_on; + use futures::sink::{Sink, SinkExt}; + use futures::stream::{self, Stream, StreamExt}; + use futures::task::{Context, Poll}; + use pin_utils::unsafe_pinned; + use std::pin::Pin; -struct Join { - stream: T, - sink: U -} + struct Join { + stream: T, + sink: U + } -impl Join { - unsafe_pinned!(stream: T); - unsafe_pinned!(sink: U); -} + impl Join { + unsafe_pinned!(stream: T); + unsafe_pinned!(sink: U); + } -impl Stream for Join { - type Item = T::Item; + impl Stream for Join { + type Item = T::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.stream().poll_next(cx) + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.stream().poll_next(cx) + } } -} -impl, Item> Sink for Join { - type Error = U::Error; + impl, Item> Sink for Join { + type Error = U::Error; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.sink().poll_ready(cx) - } + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.sink().poll_ready(cx) + } - fn start_send( - self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { - self.sink().start_send(item) - } + fn start_send( + self: Pin<&mut Self>, + item: Item, + ) -> Result<(), Self::Error> { + self.sink().start_send(item) + } - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.sink().poll_flush(cx) - } + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.sink().poll_flush(cx) + } - fn poll_close( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.sink().poll_close(cx) + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.sink().poll_close(cx) + } } -} -#[test] -fn test_split() { let mut dest: Vec = Vec::new(); { let join = Join { diff --git a/tests/stream.rs b/tests/stream.rs index fd6a8b6..09fe9e2 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -1,8 +1,9 @@ -use futures::executor::block_on; -use futures::stream::{self, StreamExt}; - +#[cfg(feature = "executor")] // executor:: #[test] fn select() { + use futures::executor::block_on; + use futures::stream::{self, StreamExt}; + fn select_and_compare(a: Vec, b: Vec, expected: Vec) { let a = stream::iter(a); let b = stream::iter(b); @@ -15,14 +16,38 @@ fn select() { select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]); } +#[cfg(feature = "executor")] // executor:: +#[test] +fn flat_map() { + use futures::stream::{self, StreamExt}; + + futures::executor::block_on(async { + let st = stream::iter(vec![ + stream::iter(0..=4u8), + stream::iter(6..=10), + stream::iter(0..=2), + ]); + + let values: Vec<_> = st + .flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))) + .collect() + .await; + + assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]); + }); +} + +#[cfg(feature = "executor")] // executor:: #[test] fn scan() { + use futures::stream::{self, StreamExt}; + futures::executor::block_on(async { assert_eq!( stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2]) - .scan(1, |acc, e| { - *acc += 1; - futures::future::ready(if e < *acc { Some(e) } else { None }) + .scan(1, |state, e| { + *state += 1; + futures::future::ready(if e < *state { Some(e) } else { None }) }) .collect::>() .await, @@ -30,3 +55,111 @@ fn scan() { ); }); } + +#[cfg(feature = "executor")] // executor:: +#[test] +fn take_until() { + use futures::future::{self, Future}; + use futures::stream::{self, StreamExt}; + use futures::task::Poll; + + fn make_stop_fut(stop_on: u32) -> impl Future { + let mut i = 0; + future::poll_fn(move |_cx| { + i += 1; + if i <= stop_on { + Poll::Pending + } else { + Poll::Ready(()) + } + }) + } + + futures::executor::block_on(async { + // Verify stopping works: + let stream = stream::iter(1u32..=10); + let stop_fut = make_stop_fut(5); + + let stream = stream.take_until(stop_fut); + let last = stream.fold(0, |_, i| async move { i }).await; + assert_eq!(last, 5); + + // Verify take_future() works: + let stream = stream::iter(1..=10); + let stop_fut = make_stop_fut(5); + + let mut stream = stream.take_until(stop_fut); + + assert_eq!(stream.next().await, Some(1)); + assert_eq!(stream.next().await, Some(2)); + + stream.take_future(); + + let last = stream.fold(0, |_, i| async move { i }).await; + assert_eq!(last, 10); + + // Verify take_future() returns None if stream is stopped: + let stream = stream::iter(1u32..=10); + let stop_fut = make_stop_fut(1); + let mut stream = stream.take_until(stop_fut); + assert_eq!(stream.next().await, Some(1)); + assert_eq!(stream.next().await, None); + assert!(stream.take_future().is_none()); + + // Verify TakeUntil is fused: + let mut i = 0; + let stream = stream::poll_fn(move |_cx| { + i += 1; + match i { + 1 => Poll::Ready(Some(1)), + 2 => Poll::Ready(None), + _ => panic!("TakeUntil not fused"), + } + }); + + let stop_fut = make_stop_fut(1); + let mut stream = stream.take_until(stop_fut); + assert_eq!(stream.next().await, Some(1)); + assert_eq!(stream.next().await, None); + assert_eq!(stream.next().await, None); + }); +} + +#[test] +#[should_panic] +fn ready_chunks_panic_on_cap_zero() { + use futures::channel::mpsc; + use futures::stream::StreamExt; + + let (_, rx1) = mpsc::channel::<()>(1); + + let _ = rx1.ready_chunks(0); +} + +#[cfg(feature = "executor")] // executor:: +#[test] +fn ready_chunks() { + use futures::channel::mpsc; + use futures::stream::StreamExt; + use futures::sink::SinkExt; + use futures::FutureExt; + use futures_test::task::noop_context; + + let (mut tx, rx1) = mpsc::channel::(16); + + let mut s = rx1.ready_chunks(2); + + let mut cx = noop_context(); + assert!(s.next().poll_unpin(&mut cx).is_pending()); + + futures::executor::block_on(async { + tx.send(1).await.unwrap(); + + assert_eq!(s.next().await.unwrap(), vec![1]); + tx.send(2).await.unwrap(); + tx.send(3).await.unwrap(); + tx.send(4).await.unwrap(); + assert_eq!(s.next().await.unwrap(), vec![2,3]); + assert_eq!(s.next().await.unwrap(), vec![4]); + }); +} \ No newline at end of file diff --git a/tests/stream_catch_unwind.rs b/tests/stream_catch_unwind.rs index 8b23a0a..94c7a75 100644 --- a/tests/stream_catch_unwind.rs +++ b/tests/stream_catch_unwind.rs @@ -1,8 +1,9 @@ -use futures::executor::block_on_stream; -use futures::stream::{self, StreamExt}; - +#[cfg(feature = "executor")] #[test] fn panic_in_the_middle_of_the_stream() { + use futures::executor::block_on_stream; + use futures::stream::{self, StreamExt}; + let stream = stream::iter(vec![Some(10), None, Some(11)]); // panic on second element @@ -14,8 +15,12 @@ fn panic_in_the_middle_of_the_stream() { assert!(iter.next().is_none()); } +#[cfg(feature = "executor")] #[test] fn no_panic() { + use futures::executor::block_on_stream; + use futures::stream::{self, StreamExt}; + let stream = stream::iter(vec![10, 11, 12]); let mut iter = block_on_stream(stream.catch_unwind()); diff --git a/tests/stream_into_async_read.rs b/tests/stream_into_async_read.rs index c528af0..1b26233 100644 --- a/tests/stream_into_async_read.rs +++ b/tests/stream_into_async_read.rs @@ -1,51 +1,32 @@ -use core::pin::Pin; -use futures::io::{AsyncRead, AsyncBufRead}; -use futures::stream::{self, TryStreamExt}; -use futures::task::Poll; -use futures_test::{task::noop_context, stream::StreamTestExt}; - -macro_rules! assert_read { - ($reader:expr, $buf:expr, $item:expr) => { - let mut cx = noop_context(); - loop { - match Pin::new(&mut $reader).poll_read(&mut cx, $buf) { - Poll::Ready(Ok(x)) => { - assert_eq!(x, $item); - break; - } - Poll::Ready(Err(err)) => { - panic!("assertion failed: expected value but got {}", err); - } - Poll::Pending => { - continue; - } - } - } - }; -} - -macro_rules! assert_fill_buf { - ($reader:expr, $buf:expr) => { - let mut cx = noop_context(); - loop { - match Pin::new(&mut $reader).poll_fill_buf(&mut cx) { - Poll::Ready(Ok(x)) => { - assert_eq!(x, $buf); - break; - } - Poll::Ready(Err(err)) => { - panic!("assertion failed: expected value but got {}", err); - } - Poll::Pending => { - continue; +#[cfg(feature = "std")] // io:: +#[test] +fn test_into_async_read() { + use core::pin::Pin; + use futures::io::AsyncRead; + use futures::stream::{self, TryStreamExt}; + use futures::task::Poll; + use futures_test::{task::noop_context, stream::StreamTestExt}; + + macro_rules! assert_read { + ($reader:expr, $buf:expr, $item:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_read(&mut cx, $buf) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $item); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; + } } } - } - }; -} + }; + } -#[test] -fn test_into_async_read() { let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); let mut reader = stream.interleave_pending().into_async_read(); let mut buf = vec![0; 3]; @@ -71,8 +52,35 @@ fn test_into_async_read() { assert_read!(reader, &mut buf, 0); } +#[cfg(feature = "std")] // io:: #[test] fn test_into_async_bufread() -> std::io::Result<()> { + use core::pin::Pin; + use futures::io::AsyncBufRead; + use futures::stream::{self, TryStreamExt}; + use futures::task::Poll; + use futures_test::{task::noop_context, stream::StreamTestExt}; + + macro_rules! assert_fill_buf { + ($reader:expr, $buf:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_fill_buf(&mut cx) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $buf); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; + } + } + } + }; + } + let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); let mut reader = stream.interleave_pending().into_async_read(); diff --git a/tests/stream_peekable.rs b/tests/stream_peekable.rs index b65a057..c49cd72 100644 --- a/tests/stream_peekable.rs +++ b/tests/stream_peekable.rs @@ -1,9 +1,10 @@ -use futures::executor::block_on; -use futures::pin_mut; -use futures::stream::{self, Peekable, StreamExt}; - +#[cfg(feature = "executor")] // executor:: #[test] fn peekable() { + use futures::executor::block_on; + use futures::pin_mut; + use futures::stream::{self, Peekable, StreamExt}; + block_on(async { let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable(); pin_mut!(peekable); diff --git a/tests/stream_select_all.rs b/tests/stream_select_all.rs index eb711dd..411cb73 100644 --- a/tests/stream_select_all.rs +++ b/tests/stream_select_all.rs @@ -1,12 +1,11 @@ -use futures::channel::mpsc; -use futures::executor::block_on_stream; -use futures::future::{self, FutureExt}; -use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; -use futures::task::Poll; -use futures_test::task::noop_context; - +#[cfg(feature = "alloc")] // stream::SelectAll #[test] fn is_terminated() { + use futures::future::{self, FutureExt}; + use futures::stream::{FusedStream, SelectAll, StreamExt}; + use futures::task::Poll; + use futures_test::task::noop_context; + let mut cx = noop_context(); let mut tasks = SelectAll::new(); @@ -30,8 +29,12 @@ fn is_terminated() { assert_eq!(tasks.is_terminated(), true); } +#[cfg(feature = "executor")] // executor:: #[test] fn issue_1626() { + use futures::executor::block_on_stream; + use futures::stream; + let a = stream::iter(0..=2); let b = stream::iter(10..=14); @@ -48,8 +51,14 @@ fn issue_1626() { assert_eq!(s.next(), None); } +#[cfg(all(feature = "alloc", feature = "std"))] // channel::mpsc +#[cfg(feature = "executor")] // executor:: #[test] fn works_1() { + use futures::channel::mpsc; + use futures::executor::block_on_stream; + use futures::stream::select_all; + let (a_tx, a_rx) = mpsc::unbounded::(); let (b_tx, b_rx) = mpsc::unbounded::(); let (c_tx, c_rx) = mpsc::unbounded::(); diff --git a/tests/stream_select_next_some.rs b/tests/stream_select_next_some.rs index 09d7e89..f2b3af2 100644 --- a/tests/stream_select_next_some.rs +++ b/tests/stream_select_next_some.rs @@ -1,12 +1,12 @@ -use futures::{future, select}; -use futures::future::{FusedFuture, FutureExt}; -use futures::stream::{FuturesUnordered, StreamExt}; -use futures::task::{Context, Poll}; -use futures_test::future::FutureTestExt; -use futures_test::task::new_count_waker; - +#[cfg(feature = "alloc")] // stream::FuturesUnordered #[test] fn is_terminated() { + use futures::future; + use futures::future::{FusedFuture, FutureExt}; + use futures::stream::{FuturesUnordered, StreamExt}; + use futures::task::{Context, Poll}; + use futures_test::task::new_count_waker; + let (waker, counter) = new_count_waker(); let mut cx = Context::from_waker(&waker); @@ -29,8 +29,14 @@ fn is_terminated() { assert_eq!(select_next_some.is_terminated(), true); } +#[cfg(all(feature = "async-await", feature = "std"))] // futures::select +#[cfg(feature = "executor")] // executor:: #[test] fn select() { + use futures::{future, select}; + use futures::stream::{FuturesUnordered, StreamExt}; + use futures_test::future::FutureTestExt; + // Checks that even though `async_tasks` will yield a `None` and return // `is_terminated() == true` during the first poll, it manages to toggle // back to having items after a future is pushed into it during the second @@ -56,8 +62,15 @@ fn select() { } // Check that `select!` macro does not fail when importing from `futures_util`. +#[cfg(feature = "alloc")] // stream::FuturesUnordered +#[cfg(feature = "async-await")] // futures_util::select turned on +#[cfg(feature = "executor")] // executor:: #[test] fn futures_util_select() { + use futures::future; + use futures::stream::{FuturesUnordered, StreamExt}; + use futures_test::future::FutureTestExt; + use futures_util::select; // Checks that even though `async_tasks` will yield a `None` and return diff --git a/tests/try_join.rs b/tests/try_join.rs index 6c6d084..0861c1e 100644 --- a/tests/try_join.rs +++ b/tests/try_join.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "executor")] // executor:: +#![cfg(feature = "async-await")] // try_join! #![deny(unreachable_code)] use futures::{try_join, executor::block_on}; diff --git a/tests/try_join_all.rs b/tests/try_join_all.rs index 662b866..1097a36 100644 --- a/tests/try_join_all.rs +++ b/tests/try_join_all.rs @@ -1,19 +1,26 @@ -use futures_util::future::*; -use std::future::Future; -use futures::executor::block_on; -use std::fmt::Debug; - -fn assert_done(actual_fut: F, expected: T) -where - T: PartialEq + Debug, - F: FnOnce() -> Box + Unpin>, -{ - let output = block_on(actual_fut()); - assert_eq!(output, expected); +#[cfg(feature = "executor")] // executor:: +mod util { + use std::future::Future; + use futures::executor::block_on; + use std::fmt::Debug; + + pub fn assert_done(actual_fut: F, expected: T) + where + T: PartialEq + Debug, + F: FnOnce() -> Box + Unpin>, + { + let output = block_on(actual_fut()); + assert_eq!(output, expected); + } } +#[cfg(feature = "executor")] // assert_done #[test] fn collect_collects() { + use futures_util::future::{err, ok, try_join_all}; + + use util::assert_done; + assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2])); assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2)); assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1])); @@ -23,8 +30,14 @@ fn collect_collects() { // TODO: needs more tests } +#[cfg(feature = "executor")] // assert_done #[test] fn try_join_all_iter_lifetime() { + use futures_util::future::{ok, try_join_all}; + use std::future::Future; + + use util::assert_done; + // In futures-rs version 0.1, this function would fail to typecheck due to an overly // conservative type parameterization of `TryJoinAll`. fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box, ()>> + Unpin> { @@ -35,8 +48,13 @@ fn try_join_all_iter_lifetime() { assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3 as usize, 0, 1])); } +#[cfg(feature = "executor")] // assert_done #[test] fn try_join_all_from_iter() { + use futures_util::future::{ok, TryJoinAll}; + + use util::assert_done; + assert_done( || Box::new(vec![ok(1), ok(2)].into_iter().collect::>()), Ok::<_, usize>(vec![1, 2]), -- cgit v1.2.3