aboutsummaryrefslogtreecommitdiff
path: root/src/sync/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/tests')
-rw-r--r--src/sync/tests/loom_broadcast.rs2
-rw-r--r--src/sync/tests/loom_cancellation_token.rs155
-rw-r--r--src/sync/tests/loom_mpsc.rs71
-rw-r--r--src/sync/tests/loom_notify.rs12
-rw-r--r--src/sync/tests/loom_oneshot.rs6
-rw-r--r--src/sync/tests/loom_semaphore_ll.rs192
-rw-r--r--src/sync/tests/loom_watch.rs36
-rw-r--r--src/sync/tests/mod.rs5
-rw-r--r--src/sync/tests/semaphore_ll.rs470
9 files changed, 112 insertions, 837 deletions
diff --git a/src/sync/tests/loom_broadcast.rs b/src/sync/tests/loom_broadcast.rs
index da12fb9..4b1f034 100644
--- a/src/sync/tests/loom_broadcast.rs
+++ b/src/sync/tests/loom_broadcast.rs
@@ -1,5 +1,5 @@
use crate::sync::broadcast;
-use crate::sync::broadcast::RecvError::{Closed, Lagged};
+use crate::sync::broadcast::error::RecvError::{Closed, Lagged};
use loom::future::block_on;
use loom::sync::Arc;
diff --git a/src/sync/tests/loom_cancellation_token.rs b/src/sync/tests/loom_cancellation_token.rs
deleted file mode 100644
index e9c9f3d..0000000
--- a/src/sync/tests/loom_cancellation_token.rs
+++ /dev/null
@@ -1,155 +0,0 @@
-use crate::sync::CancellationToken;
-
-use loom::{future::block_on, thread};
-use tokio_test::assert_ok;
-
-#[test]
-fn cancel_token() {
- loom::model(|| {
- let token = CancellationToken::new();
- let token1 = token.clone();
-
- let th1 = thread::spawn(move || {
- block_on(async {
- token1.cancelled().await;
- });
- });
-
- let th2 = thread::spawn(move || {
- token.cancel();
- });
-
- assert_ok!(th1.join());
- assert_ok!(th2.join());
- });
-}
-
-#[test]
-fn cancel_with_child() {
- loom::model(|| {
- let token = CancellationToken::new();
- let token1 = token.clone();
- let token2 = token.clone();
- let child_token = token.child_token();
-
- let th1 = thread::spawn(move || {
- block_on(async {
- token1.cancelled().await;
- });
- });
-
- let th2 = thread::spawn(move || {
- token2.cancel();
- });
-
- let th3 = thread::spawn(move || {
- block_on(async {
- child_token.cancelled().await;
- });
- });
-
- assert_ok!(th1.join());
- assert_ok!(th2.join());
- assert_ok!(th3.join());
- });
-}
-
-#[test]
-fn drop_token_no_child() {
- loom::model(|| {
- let token = CancellationToken::new();
- let token1 = token.clone();
- let token2 = token.clone();
-
- let th1 = thread::spawn(move || {
- drop(token1);
- });
-
- let th2 = thread::spawn(move || {
- drop(token2);
- });
-
- let th3 = thread::spawn(move || {
- drop(token);
- });
-
- assert_ok!(th1.join());
- assert_ok!(th2.join());
- assert_ok!(th3.join());
- });
-}
-
-#[test]
-fn drop_token_with_childs() {
- loom::model(|| {
- let token1 = CancellationToken::new();
- let child_token1 = token1.child_token();
- let child_token2 = token1.child_token();
-
- let th1 = thread::spawn(move || {
- drop(token1);
- });
-
- let th2 = thread::spawn(move || {
- drop(child_token1);
- });
-
- let th3 = thread::spawn(move || {
- drop(child_token2);
- });
-
- assert_ok!(th1.join());
- assert_ok!(th2.join());
- assert_ok!(th3.join());
- });
-}
-
-#[test]
-fn drop_and_cancel_token() {
- loom::model(|| {
- let token1 = CancellationToken::new();
- let token2 = token1.clone();
- let child_token = token1.child_token();
-
- let th1 = thread::spawn(move || {
- drop(token1);
- });
-
- let th2 = thread::spawn(move || {
- token2.cancel();
- });
-
- let th3 = thread::spawn(move || {
- drop(child_token);
- });
-
- assert_ok!(th1.join());
- assert_ok!(th2.join());
- assert_ok!(th3.join());
- });
-}
-
-#[test]
-fn cancel_parent_and_child() {
- loom::model(|| {
- let token1 = CancellationToken::new();
- let token2 = token1.clone();
- let child_token = token1.child_token();
-
- let th1 = thread::spawn(move || {
- drop(token1);
- });
-
- let th2 = thread::spawn(move || {
- token2.cancel();
- });
-
- let th3 = thread::spawn(move || {
- child_token.cancel();
- });
-
- assert_ok!(th1.join());
- assert_ok!(th2.join());
- assert_ok!(th3.join());
- });
-}
diff --git a/src/sync/tests/loom_mpsc.rs b/src/sync/tests/loom_mpsc.rs
index 6a1a6ab..c12313b 100644
--- a/src/sync/tests/loom_mpsc.rs
+++ b/src/sync/tests/loom_mpsc.rs
@@ -2,22 +2,24 @@ use crate::sync::mpsc;
use futures::future::poll_fn;
use loom::future::block_on;
+use loom::sync::Arc;
use loom::thread;
+use tokio_test::assert_ok;
#[test]
fn closing_tx() {
loom::model(|| {
- let (mut tx, mut rx) = mpsc::channel(16);
+ let (tx, mut rx) = mpsc::channel(16);
thread::spawn(move || {
tx.try_send(()).unwrap();
drop(tx);
});
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_some());
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_none());
});
}
@@ -32,15 +34,70 @@ fn closing_unbounded_tx() {
drop(tx);
});
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_some());
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_none());
});
}
#[test]
+fn closing_bounded_rx() {
+ loom::model(|| {
+ let (tx1, rx) = mpsc::channel::<()>(16);
+ let tx2 = tx1.clone();
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ block_on(tx1.closed());
+ block_on(tx2.closed());
+ });
+}
+
+#[test]
+fn closing_and_sending() {
+ loom::model(|| {
+ let (tx1, mut rx) = mpsc::channel::<()>(16);
+ let tx1 = Arc::new(tx1);
+ let tx2 = tx1.clone();
+
+ let th1 = thread::spawn(move || {
+ tx1.try_send(()).unwrap();
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(tx2.closed());
+ });
+
+ let th3 = thread::spawn(move || {
+ let v = block_on(rx.recv());
+ assert!(v.is_some());
+ drop(rx);
+ });
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ assert_ok!(th3.join());
+ });
+}
+
+#[test]
+fn closing_unbounded_rx() {
+ loom::model(|| {
+ let (tx1, rx) = mpsc::unbounded_channel::<()>();
+ let tx2 = tx1.clone();
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ block_on(tx1.closed());
+ block_on(tx2.closed());
+ });
+}
+
+#[test]
fn dropping_tx() {
loom::model(|| {
let (tx, mut rx) = mpsc::channel::<()>(16);
@@ -53,7 +110,7 @@ fn dropping_tx() {
}
drop(tx);
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_none());
});
}
@@ -71,7 +128,7 @@ fn dropping_unbounded_tx() {
}
drop(tx);
- let v = block_on(poll_fn(|cx| rx.poll_recv(cx)));
+ let v = block_on(rx.recv());
assert!(v.is_none());
});
}
diff --git a/src/sync/tests/loom_notify.rs b/src/sync/tests/loom_notify.rs
index 60981d4..79a5bf8 100644
--- a/src/sync/tests/loom_notify.rs
+++ b/src/sync/tests/loom_notify.rs
@@ -16,7 +16,7 @@ fn notify_one() {
});
});
- tx.notify();
+ tx.notify_one();
th.join().unwrap();
});
}
@@ -34,12 +34,12 @@ fn notify_multi() {
ths.push(thread::spawn(move || {
block_on(async {
notify.notified().await;
- notify.notify();
+ notify.notify_one();
})
}));
}
- notify.notify();
+ notify.notify_one();
for th in ths.drain(..) {
th.join().unwrap();
@@ -67,7 +67,7 @@ fn notify_drop() {
block_on(poll_fn(|cx| {
if recv.as_mut().poll(cx).is_ready() {
- rx1.notify();
+ rx1.notify_one();
}
Poll::Ready(())
}));
@@ -77,12 +77,12 @@ fn notify_drop() {
block_on(async {
rx2.notified().await;
// Trigger second notification
- rx2.notify();
+ rx2.notify_one();
rx2.notified().await;
});
});
- notify.notify();
+ notify.notify_one();
th1.join().unwrap();
th2.join().unwrap();
diff --git a/src/sync/tests/loom_oneshot.rs b/src/sync/tests/loom_oneshot.rs
index dfa7459..9729cfb 100644
--- a/src/sync/tests/loom_oneshot.rs
+++ b/src/sync/tests/loom_oneshot.rs
@@ -75,8 +75,10 @@ impl Future for OnClose<'_> {
type Output = bool;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
- let res = self.get_mut().tx.poll_closed(cx);
- Ready(res.is_ready())
+ let fut = self.get_mut().tx.closed();
+ crate::pin!(fut);
+
+ Ready(fut.poll(cx).is_ready())
}
}
diff --git a/src/sync/tests/loom_semaphore_ll.rs b/src/sync/tests/loom_semaphore_ll.rs
deleted file mode 100644
index b5e5efb..0000000
--- a/src/sync/tests/loom_semaphore_ll.rs
+++ /dev/null
@@ -1,192 +0,0 @@
-use crate::sync::semaphore_ll::*;
-
-use futures::future::poll_fn;
-use loom::future::block_on;
-use loom::thread;
-use std::future::Future;
-use std::pin::Pin;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::SeqCst;
-use std::sync::Arc;
-use std::task::Poll::Ready;
-use std::task::{Context, Poll};
-
-#[test]
-fn basic_usage() {
- const NUM: usize = 2;
-
- struct Actor {
- waiter: Permit,
- shared: Arc<Shared>,
- }
-
- struct Shared {
- semaphore: Semaphore,
- active: AtomicUsize,
- }
-
- impl Future for Actor {
- type Output = ();
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
- let me = &mut *self;
-
- ready!(me.waiter.poll_acquire(cx, 1, &me.shared.semaphore)).unwrap();
-
- let actual = me.shared.active.fetch_add(1, SeqCst);
- assert!(actual <= NUM - 1);
-
- let actual = me.shared.active.fetch_sub(1, SeqCst);
- assert!(actual <= NUM);
-
- me.waiter.release(1, &me.shared.semaphore);
-
- Ready(())
- }
- }
-
- loom::model(|| {
- let shared = Arc::new(Shared {
- semaphore: Semaphore::new(NUM),
- active: AtomicUsize::new(0),
- });
-
- for _ in 0..NUM {
- let shared = shared.clone();
-
- thread::spawn(move || {
- block_on(Actor {
- waiter: Permit::new(),
- shared,
- });
- });
- }
-
- block_on(Actor {
- waiter: Permit::new(),
- shared,
- });
- });
-}
-
-#[test]
-fn release() {
- loom::model(|| {
- let semaphore = Arc::new(Semaphore::new(1));
-
- {
- let semaphore = semaphore.clone();
- thread::spawn(move || {
- let mut permit = Permit::new();
-
- block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
-
- permit.release(1, &semaphore);
- });
- }
-
- let mut permit = Permit::new();
-
- block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap();
-
- permit.release(1, &semaphore);
- });
-}
-
-#[test]
-fn basic_closing() {
- const NUM: usize = 2;
-
- loom::model(|| {
- let semaphore = Arc::new(Semaphore::new(1));
-
- for _ in 0..NUM {
- let semaphore = semaphore.clone();
-
- thread::spawn(move || {
- let mut permit = Permit::new();
-
- for _ in 0..2 {
- block_on(poll_fn(|cx| {
- permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
- }))?;
-
- permit.release(1, &semaphore);
- }
-
- Ok::<(), ()>(())
- });
- }
-
- semaphore.close();
- });
-}
-
-#[test]
-fn concurrent_close() {
- const NUM: usize = 3;
-
- loom::model(|| {
- let semaphore = Arc::new(Semaphore::new(1));
-
- for _ in 0..NUM {
- let semaphore = semaphore.clone();
-
- thread::spawn(move || {
- let mut permit = Permit::new();
-
- block_on(poll_fn(|cx| {
- permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ())
- }))?;
-
- permit.release(1, &semaphore);
-
- semaphore.close();
-
- Ok::<(), ()>(())
- });
- }
- });
-}
-
-#[test]
-fn batch() {
- let mut b = loom::model::Builder::new();
- b.preemption_bound = Some(1);
-
- b.check(|| {
- let semaphore = Arc::new(Semaphore::new(10));
- let active = Arc::new(AtomicUsize::new(0));
- let mut ths = vec![];
-
- for _ in 0..2 {
- let semaphore = semaphore.clone();
- let active = active.clone();
-
- ths.push(thread::spawn(move || {
- let mut permit = Permit::new();
-
- for n in &[4, 10, 8] {
- block_on(poll_fn(|cx| permit.poll_acquire(cx, *n, &semaphore))).unwrap();
-
- active.fetch_add(*n as usize, SeqCst);
-
- let num_active = active.load(SeqCst);
- assert!(num_active <= 10);
-
- thread::yield_now();
-
- active.fetch_sub(*n as usize, SeqCst);
-
- permit.release(*n, &semaphore);
- }
- }));
- }
-
- for th in ths.into_iter() {
- th.join().unwrap();
- }
-
- assert_eq!(10, semaphore.available_permits());
- });
-}
diff --git a/src/sync/tests/loom_watch.rs b/src/sync/tests/loom_watch.rs
new file mode 100644
index 0000000..c575b5b
--- /dev/null
+++ b/src/sync/tests/loom_watch.rs
@@ -0,0 +1,36 @@
+use crate::sync::watch;
+
+use loom::future::block_on;
+use loom::thread;
+
+#[test]
+fn smoke() {
+ loom::model(|| {
+ let (tx, mut rx1) = watch::channel(1);
+ let mut rx2 = rx1.clone();
+ let mut rx3 = rx1.clone();
+ let mut rx4 = rx1.clone();
+ let mut rx5 = rx1.clone();
+
+ let th = thread::spawn(move || {
+ tx.send(2).unwrap();
+ });
+
+ block_on(rx1.changed()).unwrap();
+ assert_eq!(*rx1.borrow(), 2);
+
+ block_on(rx2.changed()).unwrap();
+ assert_eq!(*rx2.borrow(), 2);
+
+ block_on(rx3.changed()).unwrap();
+ assert_eq!(*rx3.borrow(), 2);
+
+ block_on(rx4.changed()).unwrap();
+ assert_eq!(*rx4.borrow(), 2);
+
+ block_on(rx5.changed()).unwrap();
+ assert_eq!(*rx5.borrow(), 2);
+
+ th.join().unwrap();
+ })
+}
diff --git a/src/sync/tests/mod.rs b/src/sync/tests/mod.rs
index 6ba8c1f..a78be6f 100644
--- a/src/sync/tests/mod.rs
+++ b/src/sync/tests/mod.rs
@@ -1,18 +1,15 @@
cfg_not_loom! {
mod atomic_waker;
- mod semaphore_ll;
mod semaphore_batch;
}
cfg_loom! {
mod loom_atomic_waker;
mod loom_broadcast;
- #[cfg(tokio_unstable)]
- mod loom_cancellation_token;
mod loom_list;
mod loom_mpsc;
mod loom_notify;
mod loom_oneshot;
mod loom_semaphore_batch;
- mod loom_semaphore_ll;
+ mod loom_watch;
}
diff --git a/src/sync/tests/semaphore_ll.rs b/src/sync/tests/semaphore_ll.rs
deleted file mode 100644
index bfb0757..0000000
--- a/src/sync/tests/semaphore_ll.rs
+++ /dev/null
@@ -1,470 +0,0 @@
-use crate::sync::semaphore_ll::{Permit, Semaphore};
-use tokio_test::*;
-
-#[test]
-fn poll_acquire_one_available() {
- let s = Semaphore::new(100);
- assert_eq!(s.available_permits(), 100);
-
- // Polling for a permit succeeds immediately
- let mut permit = task::spawn(Permit::new());
- assert!(!permit.is_acquired());
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 99);
- assert!(permit.is_acquired());
-
- // Polling again on the same waiter does not claim a new permit
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 99);
- assert!(permit.is_acquired());
-}
-
-#[test]
-fn poll_acquire_many_available() {
- let s = Semaphore::new(100);
- assert_eq!(s.available_permits(), 100);
-
- // Polling for a permit succeeds immediately
- let mut permit = task::spawn(Permit::new());
- assert!(!permit.is_acquired());
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
- assert_eq!(s.available_permits(), 95);
- assert!(permit.is_acquired());
-
- // Polling again on the same waiter does not claim a new permit
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 95);
- assert!(permit.is_acquired());
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
- assert_eq!(s.available_permits(), 95);
- assert!(permit.is_acquired());
-
- // Polling for a larger number of permits acquires more
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 8, &s)));
- assert_eq!(s.available_permits(), 92);
- assert!(permit.is_acquired());
-}
-
-#[test]
-fn try_acquire_one_available() {
- let s = Semaphore::new(100);
- assert_eq!(s.available_permits(), 100);
-
- // Polling for a permit succeeds immediately
- let mut permit = Permit::new();
- assert!(!permit.is_acquired());
-
- assert_ok!(permit.try_acquire(1, &s));
- assert_eq!(s.available_permits(), 99);
- assert!(permit.is_acquired());
-
- // Polling again on the same waiter does not claim a new permit
- assert_ok!(permit.try_acquire(1, &s));
- assert_eq!(s.available_permits(), 99);
- assert!(permit.is_acquired());
-}
-
-#[test]
-fn try_acquire_many_available() {
- let s = Semaphore::new(100);
- assert_eq!(s.available_permits(), 100);
-
- // Polling for a permit succeeds immediately
- let mut permit = Permit::new();
- assert!(!permit.is_acquired());
-
- assert_ok!(permit.try_acquire(5, &s));
- assert_eq!(s.available_permits(), 95);
- assert!(permit.is_acquired());
-
- // Polling again on the same waiter does not claim a new permit
- assert_ok!(permit.try_acquire(5, &s));
- assert_eq!(s.available_permits(), 95);
- assert!(permit.is_acquired());
-}
-
-#[test]
-fn poll_acquire_one_unavailable() {
- let s = Semaphore::new(1);
-
- let mut permit_1 = task::spawn(Permit::new());
- let mut permit_2 = task::spawn(Permit::new());
-
- // Acquire the first permit
- assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 0);
-
- permit_2.enter(|cx, mut p| {
- // Try to acquire the second permit
- assert_pending!(p.poll_acquire(cx, 1, &s));
- });
-
- permit_1.release(1, &s);
-
- assert_eq!(s.available_permits(), 0);
- assert!(permit_2.is_woken());
- assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- permit_2.release(1, &s);
- assert_eq!(s.available_permits(), 1);
-}
-
-#[test]
-fn forget_acquired() {
- let s = Semaphore::new(1);
-
- // Polling for a permit succeeds immediately
- let mut permit = task::spawn(Permit::new());
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- assert_eq!(s.available_permits(), 0);
-
- permit.forget(1);
- assert_eq!(s.available_permits(), 0);
-}
-
-#[test]
-fn forget_waiting() {
- let s = Semaphore::new(0);
-
- // Polling for a permit succeeds immediately
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- assert_eq!(s.available_permits(), 0);
-
- permit.forget(1);
-
- s.add_permits(1);
-
- assert!(!permit.is_woken());
- assert_eq!(s.available_permits(), 1);
-}
-
-#[test]
-fn poll_acquire_many_unavailable() {
- let s = Semaphore::new(5);
-
- let mut permit_1 = task::spawn(Permit::new());
- let mut permit_2 = task::spawn(Permit::new());
- let mut permit_3 = task::spawn(Permit::new());
-
- // Acquire the first permit
- assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 4);
-
- permit_2.enter(|cx, mut p| {
- // Try to acquire the second permit
- assert_pending!(p.poll_acquire(cx, 5, &s));
- });
-
- assert_eq!(s.available_permits(), 0);
-
- permit_3.enter(|cx, mut p| {
- // Try to acquire the third permit
- assert_pending!(p.poll_acquire(cx, 3, &s));
- });
-
- permit_1.release(1, &s);
-
- assert_eq!(s.available_permits(), 0);
- assert!(permit_2.is_woken());
- assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
-
- assert!(!permit_3.is_woken());
- assert_eq!(s.available_permits(), 0);
-
- permit_2.release(1, &s);
- assert!(!permit_3.is_woken());
- assert_eq!(s.available_permits(), 0);
-
- permit_2.release(2, &s);
- assert!(permit_3.is_woken());
-
- assert_ready_ok!(permit_3.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
-}
-
-#[test]
-fn try_acquire_one_unavailable() {
- let s = Semaphore::new(1);
-
- let mut permit_1 = Permit::new();
- let mut permit_2 = Permit::new();
-
- // Acquire the first permit
- assert_ok!(permit_1.try_acquire(1, &s));
- assert_eq!(s.available_permits(), 0);
-
- assert_err!(permit_2.try_acquire(1, &s));
-
- permit_1.release(1, &s);
-
- assert_eq!(s.available_permits(), 1);
- assert_ok!(permit_2.try_acquire(1, &s));
-
- permit_2.release(1, &s);
- assert_eq!(s.available_permits(), 1);
-}
-
-#[test]
-fn try_acquire_many_unavailable() {
- let s = Semaphore::new(5);
-
- let mut permit_1 = Permit::new();
- let mut permit_2 = Permit::new();
-
- // Acquire the first permit
- assert_ok!(permit_1.try_acquire(1, &s));
- assert_eq!(s.available_permits(), 4);
-
- assert_err!(permit_2.try_acquire(5, &s));
-
- permit_1.release(1, &s);
- assert_eq!(s.available_permits(), 5);
-
- assert_ok!(permit_2.try_acquire(5, &s));
-
- permit_2.release(1, &s);
- assert_eq!(s.available_permits(), 1);
-
- permit_2.release(1, &s);
- assert_eq!(s.available_permits(), 2);
-}
-
-#[test]
-fn poll_acquire_one_zero_permits() {
- let s = Semaphore::new(0);
- assert_eq!(s.available_permits(), 0);
-
- let mut permit = task::spawn(Permit::new());
-
- // Try to acquire the permit
- permit.enter(|cx, mut p| {
- assert_pending!(p.poll_acquire(cx, 1, &s));
- });
-
- s.add_permits(1);
-
- assert!(permit.is_woken());
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-}
-
-#[test]
-#[should_panic]
-fn validates_max_permits() {
- use std::usize;
- Semaphore::new((usize::MAX >> 2) + 1);
-}
-
-#[test]
-fn close_semaphore_prevents_acquire() {
- let s = Semaphore::new(5);
- s.close();
-
- assert_eq!(5, s.available_permits());
-
- let mut permit_1 = task::spawn(Permit::new());
- let mut permit_2 = task::spawn(Permit::new());
-
- assert_ready_err!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(5, s.available_permits());
-
- assert_ready_err!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- assert_eq!(5, s.available_permits());
-}
-
-#[test]
-fn close_semaphore_notifies_permit1() {
- let s = Semaphore::new(0);
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- s.close();
-
- assert!(permit.is_woken());
- assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-}
-
-#[test]
-fn close_semaphore_notifies_permit2() {
- let s = Semaphore::new(2);
-
- let mut permit1 = task::spawn(Permit::new());
- let mut permit2 = task::spawn(Permit::new());
- let mut permit3 = task::spawn(Permit::new());
- let mut permit4 = task::spawn(Permit::new());
-
- // Acquire a couple of permits
- assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- assert_pending!(permit3.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_pending!(permit4.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- s.close();
-
- assert!(permit3.is_woken());
- assert!(permit4.is_woken());
-
- assert_ready_err!(permit3.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_ready_err!(permit4.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- assert_eq!(0, s.available_permits());
-
- permit1.release(1, &s);
-
- assert_eq!(1, s.available_permits());
-
- assert_ready_err!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- permit2.release(1, &s);
-
- assert_eq!(2, s.available_permits());
-}
-
-#[test]
-fn poll_acquire_additional_permits_while_waiting_before_assigned() {
- let s = Semaphore::new(1);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
-
- s.add_permits(1);
- assert!(!permit.is_woken());
-
- s.add_permits(1);
- assert!(permit.is_woken());
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
-}
-
-#[test]
-fn try_acquire_additional_permits_while_waiting_before_assigned() {
- let s = Semaphore::new(1);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
-
- assert_err!(permit.enter(|_, mut p| p.try_acquire(3, &s)));
-
- s.add_permits(1);
- assert!(permit.is_woken());
-
- assert_ok!(permit.enter(|_, mut p| p.try_acquire(2, &s)));
-}
-
-#[test]
-fn poll_acquire_additional_permits_while_waiting_after_assigned_success() {
- let s = Semaphore::new(1);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
-
- s.add_permits(2);
-
- assert!(permit.is_woken());
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
-}
-
-#[test]
-fn poll_acquire_additional_permits_while_waiting_after_assigned_requeue() {
- let s = Semaphore::new(1);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
-
- s.add_permits(2);
-
- assert!(permit.is_woken());
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 4, &s)));
-
- s.add_permits(1);
-
- assert!(permit.is_woken());
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 4, &s)));
-}
-
-#[test]
-fn poll_acquire_fewer_permits_while_waiting() {
- let s = Semaphore::new(1);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- assert_eq!(s.available_permits(), 0);
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
- assert_eq!(s.available_permits(), 0);
-}
-
-#[test]
-fn poll_acquire_fewer_permits_after_assigned() {
- let s = Semaphore::new(1);
-
- let mut permit1 = task::spawn(Permit::new());
- let mut permit2 = task::spawn(Permit::new());
-
- assert_pending!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 5, &s)));
- assert_eq!(s.available_permits(), 0);
-
- assert_pending!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- s.add_permits(4);
- assert!(permit1.is_woken());
- assert!(!permit2.is_woken());
-
- assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 3, &s)));
-
- assert!(permit2.is_woken());
- assert_eq!(s.available_permits(), 1);
-
- assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-}
-
-#[test]
-fn forget_partial_1() {
- let s = Semaphore::new(0);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- s.add_permits(1);
-
- assert_eq!(0, s.available_permits());
-
- permit.release(1, &s);
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s)));
-
- assert_eq!(s.available_permits(), 0);
-}
-
-#[test]
-fn forget_partial_2() {
- let s = Semaphore::new(0);
-
- let mut permit = task::spawn(Permit::new());
-
- assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- s.add_permits(1);
-
- assert_eq!(0, s.available_permits());
-
- permit.release(1, &s);
-
- s.add_permits(1);
-
- assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s)));
- assert_eq!(s.available_permits(), 0);
-}