aboutsummaryrefslogtreecommitdiff
path: root/src/sync/batch_semaphore.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/batch_semaphore.rs')
-rw-r--r--src/sync/batch_semaphore.rs16
1 files changed, 9 insertions, 7 deletions
diff --git a/src/sync/batch_semaphore.rs b/src/sync/batch_semaphore.rs
index 872d53e..9b43404 100644
--- a/src/sync/batch_semaphore.rs
+++ b/src/sync/batch_semaphore.rs
@@ -19,6 +19,7 @@ use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Mutex, MutexGuard};
use crate::util::linked_list::{self, LinkedList};
+use crate::util::WakeList;
use std::future::Future;
use std::marker::PhantomPinned;
@@ -239,12 +240,12 @@ impl Semaphore {
/// If `rem` exceeds the number of permits needed by the wait list, the
/// remainder are assigned back to the semaphore.
fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
- let mut wakers: [Option<Waker>; 8] = Default::default();
+ let mut wakers = WakeList::new();
let mut lock = Some(waiters);
let mut is_empty = false;
while rem > 0 {
let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
- 'inner: for slot in &mut wakers[..] {
+ 'inner: while wakers.can_push() {
// Was the waiter assigned enough permits to wake it?
match waiters.queue.last() {
Some(waiter) => {
@@ -260,7 +261,11 @@ impl Semaphore {
}
};
let mut waiter = waiters.queue.pop_back().unwrap();
- *slot = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
+ if let Some(waker) =
+ unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
+ {
+ wakers.push(waker);
+ }
}
if rem > 0 && is_empty {
@@ -283,10 +288,7 @@ impl Semaphore {
drop(waiters); // release the lock
- wakers
- .iter_mut()
- .filter_map(Option::take)
- .for_each(Waker::wake);
+ wakers.wake_all();
}
assert_eq!(rem, 0);