diff options
Diffstat (limited to 'src/sync/tests')
-rw-r--r-- | src/sync/tests/loom_broadcast.rs | 2 | ||||
-rw-r--r-- | src/sync/tests/loom_cancellation_token.rs | 155 | ||||
-rw-r--r-- | src/sync/tests/loom_mpsc.rs | 71 | ||||
-rw-r--r-- | src/sync/tests/loom_notify.rs | 12 | ||||
-rw-r--r-- | src/sync/tests/loom_oneshot.rs | 6 | ||||
-rw-r--r-- | src/sync/tests/loom_semaphore_ll.rs | 192 | ||||
-rw-r--r-- | src/sync/tests/loom_watch.rs | 36 | ||||
-rw-r--r-- | src/sync/tests/mod.rs | 5 | ||||
-rw-r--r-- | src/sync/tests/semaphore_ll.rs | 470 |
9 files changed, 112 insertions, 837 deletions
diff --git a/src/sync/tests/loom_broadcast.rs b/src/sync/tests/loom_broadcast.rs index da12fb9..4b1f034 100644 --- a/src/sync/tests/loom_broadcast.rs +++ b/src/sync/tests/loom_broadcast.rs @@ -1,5 +1,5 @@ use crate::sync::broadcast; -use crate::sync::broadcast::RecvError::{Closed, Lagged}; +use crate::sync::broadcast::error::RecvError::{Closed, Lagged}; use loom::future::block_on; use loom::sync::Arc; diff --git a/src/sync/tests/loom_cancellation_token.rs b/src/sync/tests/loom_cancellation_token.rs deleted file mode 100644 index e9c9f3d..0000000 --- a/src/sync/tests/loom_cancellation_token.rs +++ /dev/null @@ -1,155 +0,0 @@ -use crate::sync::CancellationToken; - -use loom::{future::block_on, thread}; -use tokio_test::assert_ok; - -#[test] -fn cancel_token() { - loom::model(|| { - let token = CancellationToken::new(); - let token1 = token.clone(); - - let th1 = thread::spawn(move || { - block_on(async { - token1.cancelled().await; - }); - }); - - let th2 = thread::spawn(move || { - token.cancel(); - }); - - assert_ok!(th1.join()); - assert_ok!(th2.join()); - }); -} - -#[test] -fn cancel_with_child() { - loom::model(|| { - let token = CancellationToken::new(); - let token1 = token.clone(); - let token2 = token.clone(); - let child_token = token.child_token(); - - let th1 = thread::spawn(move || { - block_on(async { - token1.cancelled().await; - }); - }); - - let th2 = thread::spawn(move || { - token2.cancel(); - }); - - let th3 = thread::spawn(move || { - block_on(async { - child_token.cancelled().await; - }); - }); - - assert_ok!(th1.join()); - assert_ok!(th2.join()); - assert_ok!(th3.join()); - }); -} - -#[test] -fn drop_token_no_child() { - loom::model(|| { - let token = CancellationToken::new(); - let token1 = token.clone(); - let token2 = token.clone(); - - let th1 = thread::spawn(move || { - drop(token1); - }); - - let th2 = thread::spawn(move || { - drop(token2); - }); - - let th3 = thread::spawn(move || { - drop(token); - }); - - assert_ok!(th1.join()); - assert_ok!(th2.join()); - assert_ok!(th3.join()); - }); -} - -#[test] -fn drop_token_with_childs() { - loom::model(|| { - let token1 = CancellationToken::new(); - let child_token1 = token1.child_token(); - let child_token2 = token1.child_token(); - - let th1 = thread::spawn(move || { - drop(token1); - }); - - let th2 = thread::spawn(move || { - drop(child_token1); - }); - - let th3 = thread::spawn(move || { - drop(child_token2); - }); - - assert_ok!(th1.join()); - assert_ok!(th2.join()); - assert_ok!(th3.join()); - }); -} - -#[test] -fn drop_and_cancel_token() { - loom::model(|| { - let token1 = CancellationToken::new(); - let token2 = token1.clone(); - let child_token = token1.child_token(); - - let th1 = thread::spawn(move || { - drop(token1); - }); - - let th2 = thread::spawn(move || { - token2.cancel(); - }); - - let th3 = thread::spawn(move || { - drop(child_token); - }); - - assert_ok!(th1.join()); - assert_ok!(th2.join()); - assert_ok!(th3.join()); - }); -} - -#[test] -fn cancel_parent_and_child() { - loom::model(|| { - let token1 = CancellationToken::new(); - let token2 = token1.clone(); - let child_token = token1.child_token(); - - let th1 = thread::spawn(move || { - drop(token1); - }); - - let th2 = thread::spawn(move || { - token2.cancel(); - }); - - let th3 = thread::spawn(move || { - child_token.cancel(); - }); - - assert_ok!(th1.join()); - assert_ok!(th2.join()); - assert_ok!(th3.join()); - }); -} diff --git a/src/sync/tests/loom_mpsc.rs b/src/sync/tests/loom_mpsc.rs index 6a1a6ab..c12313b 100644 --- a/src/sync/tests/loom_mpsc.rs +++ b/src/sync/tests/loom_mpsc.rs @@ -2,22 +2,24 @@ use crate::sync::mpsc; use futures::future::poll_fn; use loom::future::block_on; +use loom::sync::Arc; use loom::thread; +use tokio_test::assert_ok; #[test] fn closing_tx() { loom::model(|| { - let (mut tx, mut rx) = mpsc::channel(16); + let (tx, mut rx) = mpsc::channel(16); thread::spawn(move || { tx.try_send(()).unwrap(); drop(tx); }); - let v = block_on(poll_fn(|cx| rx.poll_recv(cx))); + let v = block_on(rx.recv()); assert!(v.is_some()); - let v = block_on(poll_fn(|cx| rx.poll_recv(cx))); + let v = block_on(rx.recv()); assert!(v.is_none()); }); } @@ -32,15 +34,70 @@ fn closing_unbounded_tx() { drop(tx); }); - let v = block_on(poll_fn(|cx| rx.poll_recv(cx))); + let v = block_on(rx.recv()); assert!(v.is_some()); - let v = block_on(poll_fn(|cx| rx.poll_recv(cx))); + let v = block_on(rx.recv()); assert!(v.is_none()); }); } #[test] +fn closing_bounded_rx() { + loom::model(|| { + let (tx1, rx) = mpsc::channel::<()>(16); + let tx2 = tx1.clone(); + thread::spawn(move || { + drop(rx); + }); + + block_on(tx1.closed()); + block_on(tx2.closed()); + }); +} + +#[test] +fn closing_and_sending() { + loom::model(|| { + let (tx1, mut rx) = mpsc::channel::<()>(16); + let tx1 = Arc::new(tx1); + let tx2 = tx1.clone(); + + let th1 = thread::spawn(move || { + tx1.try_send(()).unwrap(); + }); + + let th2 = thread::spawn(move || { + block_on(tx2.closed()); + }); + + let th3 = thread::spawn(move || { + let v = block_on(rx.recv()); + assert!(v.is_some()); + drop(rx); + }); + + assert_ok!(th1.join()); + assert_ok!(th2.join()); + assert_ok!(th3.join()); + }); +} + +#[test] +fn closing_unbounded_rx() { + loom::model(|| { + let (tx1, rx) = mpsc::unbounded_channel::<()>(); + let tx2 = tx1.clone(); + thread::spawn(move || { + drop(rx); + }); + + block_on(tx1.closed()); + block_on(tx2.closed()); + }); +} + +#[test] fn dropping_tx() { loom::model(|| { let (tx, mut rx) = mpsc::channel::<()>(16); @@ -53,7 +110,7 @@ fn dropping_tx() { } drop(tx); - let v = block_on(poll_fn(|cx| rx.poll_recv(cx))); + let v = block_on(rx.recv()); assert!(v.is_none()); }); } @@ -71,7 +128,7 @@ fn dropping_unbounded_tx() { } drop(tx); - let v = block_on(poll_fn(|cx| rx.poll_recv(cx))); + let v = block_on(rx.recv()); assert!(v.is_none()); }); } diff --git a/src/sync/tests/loom_notify.rs b/src/sync/tests/loom_notify.rs index 60981d4..79a5bf8 100644 --- a/src/sync/tests/loom_notify.rs +++ b/src/sync/tests/loom_notify.rs @@ -16,7 +16,7 @@ fn notify_one() { }); }); - tx.notify(); + tx.notify_one(); th.join().unwrap(); }); } @@ -34,12 +34,12 @@ fn notify_multi() { ths.push(thread::spawn(move || { block_on(async { notify.notified().await; - notify.notify(); + notify.notify_one(); }) })); } - notify.notify(); + notify.notify_one(); for th in ths.drain(..) { th.join().unwrap(); @@ -67,7 +67,7 @@ fn notify_drop() { block_on(poll_fn(|cx| { if recv.as_mut().poll(cx).is_ready() { - rx1.notify(); + rx1.notify_one(); } Poll::Ready(()) })); @@ -77,12 +77,12 @@ fn notify_drop() { block_on(async { rx2.notified().await; // Trigger second notification - rx2.notify(); + rx2.notify_one(); rx2.notified().await; }); }); - notify.notify(); + notify.notify_one(); th1.join().unwrap(); th2.join().unwrap(); diff --git a/src/sync/tests/loom_oneshot.rs b/src/sync/tests/loom_oneshot.rs index dfa7459..9729cfb 100644 --- a/src/sync/tests/loom_oneshot.rs +++ b/src/sync/tests/loom_oneshot.rs @@ -75,8 +75,10 @@ impl Future for OnClose<'_> { type Output = bool; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> { - let res = self.get_mut().tx.poll_closed(cx); - Ready(res.is_ready()) + let fut = self.get_mut().tx.closed(); + crate::pin!(fut); + + Ready(fut.poll(cx).is_ready()) } } diff --git a/src/sync/tests/loom_semaphore_ll.rs b/src/sync/tests/loom_semaphore_ll.rs deleted file mode 100644 index b5e5efb..0000000 --- a/src/sync/tests/loom_semaphore_ll.rs +++ /dev/null @@ -1,192 +0,0 @@ -use crate::sync::semaphore_ll::*; - -use futures::future::poll_fn; -use loom::future::block_on; -use loom::thread; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::Arc; -use std::task::Poll::Ready; -use std::task::{Context, Poll}; - -#[test] -fn basic_usage() { - const NUM: usize = 2; - - struct Actor { - waiter: Permit, - shared: Arc<Shared>, - } - - struct Shared { - semaphore: Semaphore, - active: AtomicUsize, - } - - impl Future for Actor { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - let me = &mut *self; - - ready!(me.waiter.poll_acquire(cx, 1, &me.shared.semaphore)).unwrap(); - - let actual = me.shared.active.fetch_add(1, SeqCst); - assert!(actual <= NUM - 1); - - let actual = me.shared.active.fetch_sub(1, SeqCst); - assert!(actual <= NUM); - - me.waiter.release(1, &me.shared.semaphore); - - Ready(()) - } - } - - loom::model(|| { - let shared = Arc::new(Shared { - semaphore: Semaphore::new(NUM), - active: AtomicUsize::new(0), - }); - - for _ in 0..NUM { - let shared = shared.clone(); - - thread::spawn(move || { - block_on(Actor { - waiter: Permit::new(), - shared, - }); - }); - } - - block_on(Actor { - waiter: Permit::new(), - shared, - }); - }); -} - -#[test] -fn release() { - loom::model(|| { - let semaphore = Arc::new(Semaphore::new(1)); - - { - let semaphore = semaphore.clone(); - thread::spawn(move || { - let mut permit = Permit::new(); - - block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap(); - - permit.release(1, &semaphore); - }); - } - - let mut permit = Permit::new(); - - block_on(poll_fn(|cx| permit.poll_acquire(cx, 1, &semaphore))).unwrap(); - - permit.release(1, &semaphore); - }); -} - -#[test] -fn basic_closing() { - const NUM: usize = 2; - - loom::model(|| { - let semaphore = Arc::new(Semaphore::new(1)); - - for _ in 0..NUM { - let semaphore = semaphore.clone(); - - thread::spawn(move || { - let mut permit = Permit::new(); - - for _ in 0..2 { - block_on(poll_fn(|cx| { - permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ()) - }))?; - - permit.release(1, &semaphore); - } - - Ok::<(), ()>(()) - }); - } - - semaphore.close(); - }); -} - -#[test] -fn concurrent_close() { - const NUM: usize = 3; - - loom::model(|| { - let semaphore = Arc::new(Semaphore::new(1)); - - for _ in 0..NUM { - let semaphore = semaphore.clone(); - - thread::spawn(move || { - let mut permit = Permit::new(); - - block_on(poll_fn(|cx| { - permit.poll_acquire(cx, 1, &semaphore).map_err(|_| ()) - }))?; - - permit.release(1, &semaphore); - - semaphore.close(); - - Ok::<(), ()>(()) - }); - } - }); -} - -#[test] -fn batch() { - let mut b = loom::model::Builder::new(); - b.preemption_bound = Some(1); - - b.check(|| { - let semaphore = Arc::new(Semaphore::new(10)); - let active = Arc::new(AtomicUsize::new(0)); - let mut ths = vec![]; - - for _ in 0..2 { - let semaphore = semaphore.clone(); - let active = active.clone(); - - ths.push(thread::spawn(move || { - let mut permit = Permit::new(); - - for n in &[4, 10, 8] { - block_on(poll_fn(|cx| permit.poll_acquire(cx, *n, &semaphore))).unwrap(); - - active.fetch_add(*n as usize, SeqCst); - - let num_active = active.load(SeqCst); - assert!(num_active <= 10); - - thread::yield_now(); - - active.fetch_sub(*n as usize, SeqCst); - - permit.release(*n, &semaphore); - } - })); - } - - for th in ths.into_iter() { - th.join().unwrap(); - } - - assert_eq!(10, semaphore.available_permits()); - }); -} diff --git a/src/sync/tests/loom_watch.rs b/src/sync/tests/loom_watch.rs new file mode 100644 index 0000000..c575b5b --- /dev/null +++ b/src/sync/tests/loom_watch.rs @@ -0,0 +1,36 @@ +use crate::sync::watch; + +use loom::future::block_on; +use loom::thread; + +#[test] +fn smoke() { + loom::model(|| { + let (tx, mut rx1) = watch::channel(1); + let mut rx2 = rx1.clone(); + let mut rx3 = rx1.clone(); + let mut rx4 = rx1.clone(); + let mut rx5 = rx1.clone(); + + let th = thread::spawn(move || { + tx.send(2).unwrap(); + }); + + block_on(rx1.changed()).unwrap(); + assert_eq!(*rx1.borrow(), 2); + + block_on(rx2.changed()).unwrap(); + assert_eq!(*rx2.borrow(), 2); + + block_on(rx3.changed()).unwrap(); + assert_eq!(*rx3.borrow(), 2); + + block_on(rx4.changed()).unwrap(); + assert_eq!(*rx4.borrow(), 2); + + block_on(rx5.changed()).unwrap(); + assert_eq!(*rx5.borrow(), 2); + + th.join().unwrap(); + }) +} diff --git a/src/sync/tests/mod.rs b/src/sync/tests/mod.rs index 6ba8c1f..a78be6f 100644 --- a/src/sync/tests/mod.rs +++ b/src/sync/tests/mod.rs @@ -1,18 +1,15 @@ cfg_not_loom! { mod atomic_waker; - mod semaphore_ll; mod semaphore_batch; } cfg_loom! { mod loom_atomic_waker; mod loom_broadcast; - #[cfg(tokio_unstable)] - mod loom_cancellation_token; mod loom_list; mod loom_mpsc; mod loom_notify; mod loom_oneshot; mod loom_semaphore_batch; - mod loom_semaphore_ll; + mod loom_watch; } diff --git a/src/sync/tests/semaphore_ll.rs b/src/sync/tests/semaphore_ll.rs deleted file mode 100644 index bfb0757..0000000 --- a/src/sync/tests/semaphore_ll.rs +++ /dev/null @@ -1,470 +0,0 @@ -use crate::sync::semaphore_ll::{Permit, Semaphore}; -use tokio_test::*; - -#[test] -fn poll_acquire_one_available() { - let s = Semaphore::new(100); - assert_eq!(s.available_permits(), 100); - - // Polling for a permit succeeds immediately - let mut permit = task::spawn(Permit::new()); - assert!(!permit.is_acquired()); - - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - assert_eq!(s.available_permits(), 99); - assert!(permit.is_acquired()); - - // Polling again on the same waiter does not claim a new permit - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - assert_eq!(s.available_permits(), 99); - assert!(permit.is_acquired()); -} - -#[test] -fn poll_acquire_many_available() { - let s = Semaphore::new(100); - assert_eq!(s.available_permits(), 100); - - // Polling for a permit succeeds immediately - let mut permit = task::spawn(Permit::new()); - assert!(!permit.is_acquired()); - - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 5, &s))); - assert_eq!(s.available_permits(), 95); - assert!(permit.is_acquired()); - - // Polling again on the same waiter does not claim a new permit - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - assert_eq!(s.available_permits(), 95); - assert!(permit.is_acquired()); - - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 5, &s))); - assert_eq!(s.available_permits(), 95); - assert!(permit.is_acquired()); - - // Polling for a larger number of permits acquires more - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 8, &s))); - assert_eq!(s.available_permits(), 92); - assert!(permit.is_acquired()); -} - -#[test] -fn try_acquire_one_available() { - let s = Semaphore::new(100); - assert_eq!(s.available_permits(), 100); - - // Polling for a permit succeeds immediately - let mut permit = Permit::new(); - assert!(!permit.is_acquired()); - - assert_ok!(permit.try_acquire(1, &s)); - assert_eq!(s.available_permits(), 99); - assert!(permit.is_acquired()); - - // Polling again on the same waiter does not claim a new permit - assert_ok!(permit.try_acquire(1, &s)); - assert_eq!(s.available_permits(), 99); - assert!(permit.is_acquired()); -} - -#[test] -fn try_acquire_many_available() { - let s = Semaphore::new(100); - assert_eq!(s.available_permits(), 100); - - // Polling for a permit succeeds immediately - let mut permit = Permit::new(); - assert!(!permit.is_acquired()); - - assert_ok!(permit.try_acquire(5, &s)); - assert_eq!(s.available_permits(), 95); - assert!(permit.is_acquired()); - - // Polling again on the same waiter does not claim a new permit - assert_ok!(permit.try_acquire(5, &s)); - assert_eq!(s.available_permits(), 95); - assert!(permit.is_acquired()); -} - -#[test] -fn poll_acquire_one_unavailable() { - let s = Semaphore::new(1); - - let mut permit_1 = task::spawn(Permit::new()); - let mut permit_2 = task::spawn(Permit::new()); - - // Acquire the first permit - assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - assert_eq!(s.available_permits(), 0); - - permit_2.enter(|cx, mut p| { - // Try to acquire the second permit - assert_pending!(p.poll_acquire(cx, 1, &s)); - }); - - permit_1.release(1, &s); - - assert_eq!(s.available_permits(), 0); - assert!(permit_2.is_woken()); - assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - - permit_2.release(1, &s); - assert_eq!(s.available_permits(), 1); -} - -#[test] -fn forget_acquired() { - let s = Semaphore::new(1); - - // Polling for a permit succeeds immediately - let mut permit = task::spawn(Permit::new()); - - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - - assert_eq!(s.available_permits(), 0); - - permit.forget(1); - assert_eq!(s.available_permits(), 0); -} - -#[test] -fn forget_waiting() { - let s = Semaphore::new(0); - - // Polling for a permit succeeds immediately - let mut permit = task::spawn(Permit::new()); - - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - - assert_eq!(s.available_permits(), 0); - - permit.forget(1); - - s.add_permits(1); - - assert!(!permit.is_woken()); - assert_eq!(s.available_permits(), 1); -} - -#[test] -fn poll_acquire_many_unavailable() { - let s = Semaphore::new(5); - - let mut permit_1 = task::spawn(Permit::new()); - let mut permit_2 = task::spawn(Permit::new()); - let mut permit_3 = task::spawn(Permit::new()); - - // Acquire the first permit - assert_ready_ok!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - assert_eq!(s.available_permits(), 4); - - permit_2.enter(|cx, mut p| { - // Try to acquire the second permit - assert_pending!(p.poll_acquire(cx, 5, &s)); - }); - - assert_eq!(s.available_permits(), 0); - - permit_3.enter(|cx, mut p| { - // Try to acquire the third permit - assert_pending!(p.poll_acquire(cx, 3, &s)); - }); - - permit_1.release(1, &s); - - assert_eq!(s.available_permits(), 0); - assert!(permit_2.is_woken()); - assert_ready_ok!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 5, &s))); - - assert!(!permit_3.is_woken()); - assert_eq!(s.available_permits(), 0); - - permit_2.release(1, &s); - assert!(!permit_3.is_woken()); - assert_eq!(s.available_permits(), 0); - - permit_2.release(2, &s); - assert!(permit_3.is_woken()); - - assert_ready_ok!(permit_3.enter(|cx, mut p| p.poll_acquire(cx, 3, &s))); -} - -#[test] -fn try_acquire_one_unavailable() { - let s = Semaphore::new(1); - - let mut permit_1 = Permit::new(); - let mut permit_2 = Permit::new(); - - // Acquire the first permit - assert_ok!(permit_1.try_acquire(1, &s)); - assert_eq!(s.available_permits(), 0); - - assert_err!(permit_2.try_acquire(1, &s)); - - permit_1.release(1, &s); - - assert_eq!(s.available_permits(), 1); - assert_ok!(permit_2.try_acquire(1, &s)); - - permit_2.release(1, &s); - assert_eq!(s.available_permits(), 1); -} - -#[test] -fn try_acquire_many_unavailable() { - let s = Semaphore::new(5); - - let mut permit_1 = Permit::new(); - let mut permit_2 = Permit::new(); - - // Acquire the first permit - assert_ok!(permit_1.try_acquire(1, &s)); - assert_eq!(s.available_permits(), 4); - - assert_err!(permit_2.try_acquire(5, &s)); - - permit_1.release(1, &s); - assert_eq!(s.available_permits(), 5); - - assert_ok!(permit_2.try_acquire(5, &s)); - - permit_2.release(1, &s); - assert_eq!(s.available_permits(), 1); - - permit_2.release(1, &s); - assert_eq!(s.available_permits(), 2); -} - -#[test] -fn poll_acquire_one_zero_permits() { - let s = Semaphore::new(0); - assert_eq!(s.available_permits(), 0); - - let mut permit = task::spawn(Permit::new()); - - // Try to acquire the permit - permit.enter(|cx, mut p| { - assert_pending!(p.poll_acquire(cx, 1, &s)); - }); - - s.add_permits(1); - - assert!(permit.is_woken()); - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); -} - -#[test] -#[should_panic] -fn validates_max_permits() { - use std::usize; - Semaphore::new((usize::MAX >> 2) + 1); -} - -#[test] -fn close_semaphore_prevents_acquire() { - let s = Semaphore::new(5); - s.close(); - - assert_eq!(5, s.available_permits()); - - let mut permit_1 = task::spawn(Permit::new()); - let mut permit_2 = task::spawn(Permit::new()); - - assert_ready_err!(permit_1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - assert_eq!(5, s.available_permits()); - - assert_ready_err!(permit_2.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); - assert_eq!(5, s.available_permits()); -} - -#[test] -fn close_semaphore_notifies_permit1() { - let s = Semaphore::new(0); - let mut permit = task::spawn(Permit::new()); - - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - - s.close(); - - assert!(permit.is_woken()); - assert_ready_err!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); -} - -#[test] -fn close_semaphore_notifies_permit2() { - let s = Semaphore::new(2); - - let mut permit1 = task::spawn(Permit::new()); - let mut permit2 = task::spawn(Permit::new()); - let mut permit3 = task::spawn(Permit::new()); - let mut permit4 = task::spawn(Permit::new()); - - // Acquire a couple of permits - assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - - assert_pending!(permit3.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - assert_pending!(permit4.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - - s.close(); - - assert!(permit3.is_woken()); - assert!(permit4.is_woken()); - - assert_ready_err!(permit3.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - assert_ready_err!(permit4.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - - assert_eq!(0, s.available_permits()); - - permit1.release(1, &s); - - assert_eq!(1, s.available_permits()); - - assert_ready_err!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - - permit2.release(1, &s); - - assert_eq!(2, s.available_permits()); -} - -#[test] -fn poll_acquire_additional_permits_while_waiting_before_assigned() { - let s = Semaphore::new(1); - - let mut permit = task::spawn(Permit::new()); - - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s))); - - s.add_permits(1); - assert!(!permit.is_woken()); - - s.add_permits(1); - assert!(permit.is_woken()); - - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s))); -} - -#[test] -fn try_acquire_additional_permits_while_waiting_before_assigned() { - let s = Semaphore::new(1); - - let mut permit = task::spawn(Permit::new()); - - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); - - assert_err!(permit.enter(|_, mut p| p.try_acquire(3, &s))); - - s.add_permits(1); - assert!(permit.is_woken()); - - assert_ok!(permit.enter(|_, mut p| p.try_acquire(2, &s))); -} - -#[test] -fn poll_acquire_additional_permits_while_waiting_after_assigned_success() { - let s = Semaphore::new(1); - - let mut permit = task::spawn(Permit::new()); - - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); - - s.add_permits(2); - - assert!(permit.is_woken()); - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 3, &s))); -} - -#[test] -fn poll_acquire_additional_permits_while_waiting_after_assigned_requeue() { - let s = Semaphore::new(1); - - let mut permit = task::spawn(Permit::new()); - - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); - - s.add_permits(2); - - assert!(permit.is_woken()); - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 4, &s))); - - s.add_permits(1); - - assert!(permit.is_woken()); - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 4, &s))); -} - -#[test] -fn poll_acquire_fewer_permits_while_waiting() { - let s = Semaphore::new(1); - - let mut permit = task::spawn(Permit::new()); - - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); - assert_eq!(s.available_permits(), 0); - - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - assert_eq!(s.available_permits(), 0); -} - -#[test] -fn poll_acquire_fewer_permits_after_assigned() { - let s = Semaphore::new(1); - - let mut permit1 = task::spawn(Permit::new()); - let mut permit2 = task::spawn(Permit::new()); - - assert_pending!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 5, &s))); - assert_eq!(s.available_permits(), 0); - - assert_pending!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - - s.add_permits(4); - assert!(permit1.is_woken()); - assert!(!permit2.is_woken()); - - assert_ready_ok!(permit1.enter(|cx, mut p| p.poll_acquire(cx, 3, &s))); - - assert!(permit2.is_woken()); - assert_eq!(s.available_permits(), 1); - - assert_ready_ok!(permit2.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); -} - -#[test] -fn forget_partial_1() { - let s = Semaphore::new(0); - - let mut permit = task::spawn(Permit::new()); - - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); - s.add_permits(1); - - assert_eq!(0, s.available_permits()); - - permit.release(1, &s); - - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 1, &s))); - - assert_eq!(s.available_permits(), 0); -} - -#[test] -fn forget_partial_2() { - let s = Semaphore::new(0); - - let mut permit = task::spawn(Permit::new()); - - assert_pending!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); - s.add_permits(1); - - assert_eq!(0, s.available_permits()); - - permit.release(1, &s); - - s.add_permits(1); - - assert_ready_ok!(permit.enter(|cx, mut p| p.poll_acquire(cx, 2, &s))); - assert_eq!(s.available_permits(), 0); -} |