diff options
Diffstat (limited to 'src/sync/batch_semaphore.rs')
-rw-r--r-- | src/sync/batch_semaphore.rs | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/src/sync/batch_semaphore.rs b/src/sync/batch_semaphore.rs index 872d53e..9b43404 100644 --- a/src/sync/batch_semaphore.rs +++ b/src/sync/batch_semaphore.rs @@ -19,6 +19,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Mutex, MutexGuard}; use crate::util::linked_list::{self, LinkedList}; +use crate::util::WakeList; use std::future::Future; use std::marker::PhantomPinned; @@ -239,12 +240,12 @@ impl Semaphore { /// If `rem` exceeds the number of permits needed by the wait list, the /// remainder are assigned back to the semaphore. fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) { - let mut wakers: [Option<Waker>; 8] = Default::default(); + let mut wakers = WakeList::new(); let mut lock = Some(waiters); let mut is_empty = false; while rem > 0 { let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock()); - 'inner: for slot in &mut wakers[..] { + 'inner: while wakers.can_push() { // Was the waiter assigned enough permits to wake it? match waiters.queue.last() { Some(waiter) => { @@ -260,7 +261,11 @@ impl Semaphore { } }; let mut waiter = waiters.queue.pop_back().unwrap(); - *slot = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }; + if let Some(waker) = + unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) } + { + wakers.push(waker); + } } if rem > 0 && is_empty { @@ -283,10 +288,7 @@ impl Semaphore { drop(waiters); // release the lock - wakers - .iter_mut() - .filter_map(Option::take) - .for_each(Waker::wake); + wakers.wake_all(); } assert_eq!(rem, 0); |