aboutsummaryrefslogtreecommitdiff
path: root/tests/shared.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/shared.rs')
-rw-r--r--tests/shared.rs107
1 files changed, 89 insertions, 18 deletions
diff --git a/tests/shared.rs b/tests/shared.rs
index 8402bfe..21e80fe 100644
--- a/tests/shared.rs
+++ b/tests/shared.rs
@@ -1,12 +1,23 @@
-use futures::channel::oneshot;
-use futures::executor::{block_on, LocalPool};
-use futures::future::{self, FutureExt, TryFutureExt, LocalFutureObj};
-use futures::task::LocalSpawn;
-use std::cell::{Cell, RefCell};
-use std::rc::Rc;
-use std::thread;
+mod count_clone {
+ use std::cell::Cell;
+ use std::rc::Rc;
+ pub struct CountClone(pub Rc<Cell<i32>>);
+
+ impl Clone for CountClone {
+ fn clone(&self) -> Self {
+ self.0.set(self.0.get() + 1);
+ CountClone(self.0.clone())
+ }
+ }
+}
+
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+ use std::thread;
let (tx, rx) = oneshot::channel::<i32>();
let f = rx.shared();
let join_handles = (0..threads_number)
@@ -26,23 +37,32 @@ fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) {
}
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn one_thread() {
send_shared_oneshot_and_wait_on_multiple_threads(1);
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn two_threads() {
send_shared_oneshot_and_wait_on_multiple_threads(2);
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn many_threads() {
send_shared_oneshot_and_wait_on_multiple_threads(1000);
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn drop_on_one_task_ok() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::{self, FutureExt, TryFutureExt};
+ use std::thread;
+
let (tx, rx) = oneshot::channel::<u32>();
let f1 = rx.shared();
let f2 = f1.clone();
@@ -69,15 +89,22 @@ fn drop_on_one_task_ok() {
t2.join().unwrap();
}
+#[cfg(feature = "executor")] // executor::
#[test]
fn drop_in_poll() {
+ use futures::executor::block_on;
+ use futures::future::{self, FutureExt, LocalFutureObj};
+ use std::cell::RefCell;
+ use std::rc::Rc;
+
let slot1 = Rc::new(RefCell::new(None));
let slot2 = slot1.clone();
let future1 = future::lazy(move |_| {
slot2.replace(None); // Drop future
1
- }).shared();
+ })
+ .shared();
let future2 = LocalFutureObj::new(Box::new(future1.clone()));
slot1.replace(Some(future2));
@@ -85,8 +112,14 @@ fn drop_in_poll() {
assert_eq!(block_on(future1), 1);
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn peek() {
+ use futures::channel::oneshot;
+ use futures::executor::LocalPool;
+ use futures::future::{FutureExt, LocalFutureObj};
+ use futures::task::LocalSpawn;
+
let mut local_pool = LocalPool::new();
let spawn = &mut local_pool.spawner();
@@ -108,24 +141,26 @@ fn peek() {
}
// Once the Shared has been polled, the value is peekable on the clone.
- spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap();
+ spawn
+ .spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ()))))
+ .unwrap();
local_pool.run();
for _ in 0..2 {
assert_eq!(*f2.peek().unwrap(), Ok(42));
}
}
-struct CountClone(Rc<Cell<i32>>);
-
-impl Clone for CountClone {
- fn clone(&self) -> Self {
- self.0.set(self.0.get() + 1);
- CountClone(self.0.clone())
- }
-}
-
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn dont_clone_in_single_owner_shared_future() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+ use std::cell::Cell;
+ use std::rc::Rc;
+
+ use count_clone::CountClone;
+
let counter = CountClone(Rc::new(Cell::new(0)));
let (tx, rx) = oneshot::channel();
@@ -136,8 +171,17 @@ fn dont_clone_in_single_owner_shared_future() {
assert_eq!(block_on(rx).unwrap().0.get(), 0);
}
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn dont_do_unnecessary_clones_on_output() {
+ use futures::channel::oneshot;
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+ use std::cell::Cell;
+ use std::rc::Rc;
+
+ use count_clone::CountClone;
+
let counter = CountClone(Rc::new(Cell::new(0)));
let (tx, rx) = oneshot::channel();
@@ -149,3 +193,30 @@ fn dont_do_unnecessary_clones_on_output() {
assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2);
assert_eq!(block_on(rx).unwrap().0.get(), 2);
}
+
+#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
+#[test]
+fn shared_future_that_wakes_itself_until_pending_is_returned() {
+ use futures::executor::block_on;
+ use futures::future::FutureExt;
+ use std::cell::Cell;
+ use std::task::Poll;
+
+ let proceed = Cell::new(false);
+ let fut = futures::future::poll_fn(|cx| {
+ if proceed.get() {
+ Poll::Ready(())
+ } else {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ })
+ .shared();
+
+ // The join future can only complete if the second future gets a chance to run after the first
+ // has returned pending
+ assert_eq!(
+ block_on(futures::future::join(fut, async { proceed.set(true) })),
+ ((), ())
+ );
+}