diff options
Diffstat (limited to 'src/sync/notify.rs')
-rw-r--r-- | src/sync/notify.rs | 409 |
1 files changed, 305 insertions, 104 deletions
diff --git a/src/sync/notify.rs b/src/sync/notify.rs index efe16f9..bf00ca3 100644 --- a/src/sync/notify.rs +++ b/src/sync/notify.rs @@ -5,21 +5,22 @@ // triggers this warning but it is safe to ignore in this case. #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] +use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; -use crate::util::linked_list::{self, LinkedList}; +use crate::util::linked_list::{self, GuardedLinkedList, LinkedList}; use crate::util::WakeList; -use std::cell::UnsafeCell; use std::future::Future; use std::marker::PhantomPinned; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::pin::Pin; use std::ptr::NonNull; -use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst}; use std::task::{Context, Poll, Waker}; type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; +type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>; /// Notifies a single task to wake up. /// @@ -198,37 +199,51 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; /// [`Semaphore`]: crate::sync::Semaphore #[derive(Debug)] pub struct Notify { - // This uses 2 bits to store one of `EMPTY`, + // `state` uses 2 bits to store one of `EMPTY`, // `WAITING` or `NOTIFIED`. The rest of the bits // are used to store the number of times `notify_waiters` // was called. + // + // Throughout the code there are two assumptions: + // - state can be transitioned *from* `WAITING` only if + // `waiters` lock is held + // - number of times `notify_waiters` was called can + // be modified only if `waiters` lock is held state: AtomicUsize, waiters: Mutex<WaitList>, } -#[derive(Debug, Clone, Copy)] -enum NotificationType { - // Notification triggered by calling `notify_waiters` - AllWaiters, - // Notification triggered by calling `notify_one` - OneWaiter, -} - #[derive(Debug)] struct Waiter { /// Intrusive linked-list pointers. pointers: linked_list::Pointers<Waiter>, - /// Waiting task's waker. - waker: Option<Waker>, + /// Waiting task's waker. Depending on the value of `notification`, + /// this field is either protected by the `waiters` lock in + /// `Notify`, or it is exclusively owned by the enclosing `Waiter`. + waker: UnsafeCell<Option<Waker>>, - /// `true` if the notification has been assigned to this waiter. - notified: Option<NotificationType>, + /// Notification for this waiter. + /// * if it's `None`, then `waker` is protected by the `waiters` lock. + /// * if it's `Some`, then `waker` is exclusively owned by the + /// enclosing `Waiter` and can be accessed without locking. + notification: AtomicNotification, /// Should not be `Unpin`. _p: PhantomPinned, } +impl Waiter { + fn new() -> Waiter { + Waiter { + pointers: linked_list::Pointers::new(), + waker: UnsafeCell::new(None), + notification: AtomicNotification::none(), + _p: PhantomPinned, + } + } +} + generate_addr_of_methods! { impl<> Waiter { unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { @@ -237,6 +252,109 @@ generate_addr_of_methods! { } } +// No notification. +const NOTIFICATION_NONE: usize = 0; + +// Notification type used by `notify_one`. +const NOTIFICATION_ONE: usize = 1; + +// Notification type used by `notify_waiters`. +const NOTIFICATION_ALL: usize = 2; + +/// Notification for a `Waiter`. +/// This struct is equivalent to `Option<Notification>`, but uses +/// `AtomicUsize` inside for atomic operations. +#[derive(Debug)] +struct AtomicNotification(AtomicUsize); + +impl AtomicNotification { + fn none() -> Self { + AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE)) + } + + /// Store-release a notification. + /// This method should be called exactly once. + fn store_release(&self, notification: Notification) { + self.0.store(notification as usize, Release); + } + + fn load(&self, ordering: Ordering) -> Option<Notification> { + match self.0.load(ordering) { + NOTIFICATION_NONE => None, + NOTIFICATION_ONE => Some(Notification::One), + NOTIFICATION_ALL => Some(Notification::All), + _ => unreachable!(), + } + } + + /// Clears the notification. + /// This method is used by a `Notified` future to consume the + /// notification. It uses relaxed ordering and should be only + /// used once the atomic notification is no longer shared. + fn clear(&self) { + self.0.store(NOTIFICATION_NONE, Relaxed); + } +} + +#[derive(Debug, PartialEq, Eq)] +#[repr(usize)] +enum Notification { + One = NOTIFICATION_ONE, + All = NOTIFICATION_ALL, +} + +/// List used in `Notify::notify_waiters`. It wraps a guarded linked list +/// and gates the access to it on `notify.waiters` mutex. It also empties +/// the list on drop. +struct NotifyWaitersList<'a> { + list: GuardedWaitList, + is_empty: bool, + notify: &'a Notify, +} + +impl<'a> NotifyWaitersList<'a> { + fn new( + unguarded_list: WaitList, + guard: Pin<&'a Waiter>, + notify: &'a Notify, + ) -> NotifyWaitersList<'a> { + let guard_ptr = NonNull::from(guard.get_ref()); + let list = unguarded_list.into_guarded(guard_ptr); + NotifyWaitersList { + list, + is_empty: false, + notify, + } + } + + /// Removes the last element from the guarded list. Modifying this list + /// requires an exclusive access to the main list in `Notify`. + fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> { + let result = self.list.pop_back(); + if result.is_none() { + // Save information about emptiness to avoid waiting for lock + // in the destructor. + self.is_empty = true; + } + result + } +} + +impl Drop for NotifyWaitersList<'_> { + fn drop(&mut self) { + // If the list is not empty, we unlink all waiters from it. + // We do not wake the waiters to avoid double panics. + if !self.is_empty { + let _lock_guard = self.notify.waiters.lock(); + while let Some(waiter) = self.list.pop_back() { + // Safety: we never make mutable references to waiters. + let waiter = unsafe { waiter.as_ref() }; + waiter.notification.store_release(Notification::All); + } + } + } +} + /// Future returned from [`Notify::notified()`]. /// /// This future is fused, so once it has completed, any future calls to poll @@ -249,8 +367,11 @@ pub struct Notified<'a> { /// The current state of the receiving process. state: State, + /// Number of calls to `notify_waiters` at the time of creation. + notify_waiters_calls: usize, + /// Entry in the waiter `LinkedList`. - waiter: UnsafeCell<Waiter>, + waiter: Waiter, } unsafe impl<'a> Send for Notified<'a> {} @@ -258,7 +379,7 @@ unsafe impl<'a> Sync for Notified<'a> {} #[derive(Debug)] enum State { - Init(usize), + Init, Waiting, Done, } @@ -322,8 +443,7 @@ impl Notify { /// /// static NOTIFY: Notify = Notify::const_new(); /// ``` - #[cfg(all(feature = "parking_lot", not(all(loom, test))))] - #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + #[cfg(not(all(loom, test)))] pub const fn const_new() -> Notify { Notify { state: AtomicUsize::new(0), @@ -383,17 +503,13 @@ impl Notify { /// ``` pub fn notified(&self) -> Notified<'_> { // we load the number of times notify_waiters - // was called and store that in our initial state + // was called and store that in the future. let state = self.state.load(SeqCst); Notified { notify: self, - state: State::Init(state >> NOTIFY_WAITERS_SHIFT), - waiter: UnsafeCell::new(Waiter { - pointers: linked_list::Pointers::new(), - waker: None, - notified: None, - _p: PhantomPinned, - }), + state: State::Init, + notify_waiters_calls: get_num_notify_waiters_calls(state), + waiter: Waiter::new(), } } @@ -500,12 +616,9 @@ impl Notify { /// } /// ``` pub fn notify_waiters(&self) { - let mut wakers = WakeList::new(); - - // There are waiters, the lock must be acquired to notify. let mut waiters = self.waiters.lock(); - // The state must be reloaded while the lock is held. The state may only + // The state must be loaded while the lock is held. The state may only // transition out of WAITING while the lock is held. let curr = self.state.load(SeqCst); @@ -516,23 +629,43 @@ impl Notify { return; } - // At this point, it is guaranteed that the state will not - // concurrently change, as holding the lock is required to - // transition **out** of `WAITING`. + // Increment the number of times this method was called + // and transition to empty. + let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY); + self.state.store(new_state, SeqCst); + + // It is critical for `GuardedLinkedList` safety that the guard node is + // pinned in memory and is not dropped until the guarded list is dropped. + let guard = Waiter::new(); + pin!(guard); + + // We move all waiters to a secondary list. It uses a `GuardedLinkedList` + // underneath to allow every waiter to safely remove itself from it. + // + // * This list will be still guarded by the `waiters` lock. + // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it. + // * This wrapper will empty the list on drop. It is critical for safety + // that we will not leave any list entry with a pointer to the local + // guard node after this function returns / panics. + let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self); + + let mut wakers = WakeList::new(); 'outer: loop { while wakers.can_push() { - match waiters.pop_back() { - Some(mut waiter) => { - // Safety: `waiters` lock is still held. - let waiter = unsafe { waiter.as_mut() }; - - assert!(waiter.notified.is_none()); - - waiter.notified = Some(NotificationType::AllWaiters); - - if let Some(waker) = waiter.waker.take() { + match list.pop_back_locked(&mut waiters) { + Some(waiter) => { + // Safety: we never make mutable references to waiters. + let waiter = unsafe { waiter.as_ref() }; + + // Safety: we hold the lock, so we can access the waker. + if let Some(waker) = + unsafe { waiter.waker.with_mut(|waker| (*waker).take()) } + { wakers.push(waker); } + + // This waiter is unlinked and will not be shared ever again, release it. + waiter.notification.store_release(Notification::All); } None => { break 'outer; @@ -540,20 +673,17 @@ impl Notify { } } + // Release the lock before notifying. drop(waiters); + // One of the wakers may panic, but the remaining waiters will still + // be unlinked from the list in `NotifyWaitersList` destructor. wakers.wake_all(); // Acquire the lock again. waiters = self.waiters.lock(); } - // All waiters will be notified, the state must be transitioned to - // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be - // held, a `store` is sufficient. - let new = set_state(inc_num_notify_waiters_calls(curr), EMPTY); - self.state.store(new, SeqCst); - // Release the lock before notifying drop(waiters); @@ -592,15 +722,16 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op // transition **out** of `WAITING`. // // Get a pending waiter - let mut waiter = waiters.pop_back().unwrap(); + let waiter = waiters.pop_back().unwrap(); - // Safety: `waiters` lock is still held. - let waiter = unsafe { waiter.as_mut() }; + // Safety: we never make mutable references to waiters. + let waiter = unsafe { waiter.as_ref() }; - assert!(waiter.notified.is_none()); + // Safety: we hold the lock, so we can access the waker. + let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; - waiter.notified = Some(NotificationType::OneWaiter); - let waker = waiter.waker.take(); + // This waiter is unlinked and will not be shared ever again, release it. + waiter.notification.store_release(Notification::One); if waiters.is_empty() { // As this the **final** waiter in the list, the state @@ -730,26 +861,32 @@ impl Notified<'_> { /// A custom `project` implementation is used in place of `pin-project-lite` /// as a custom drop implementation is needed. - fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>) { + fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) { unsafe { - // Safety: both `notify` and `state` are `Unpin`. + // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`. is_unpin::<&Notify>(); - is_unpin::<AtomicUsize>(); + is_unpin::<State>(); + is_unpin::<usize>(); let me = self.get_unchecked_mut(); - (me.notify, &mut me.state, &me.waiter) + ( + me.notify, + &mut me.state, + &me.notify_waiters_calls, + &me.waiter, + ) } } fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> { use State::*; - let (notify, state, waiter) = self.project(); + let (notify, state, notify_waiters_calls, waiter) = self.project(); - loop { + 'outer_loop: loop { match *state { - Init(initial_notify_waiters_calls) => { + Init => { let curr = notify.state.load(SeqCst); // Optimistically try acquiring a pending notification @@ -763,7 +900,7 @@ impl Notified<'_> { if res.is_ok() { // Acquired the notification *state = Done; - return Poll::Ready(()); + continue 'outer_loop; } // Clone the waker before locking, a waker clone can be @@ -779,9 +916,9 @@ impl Notified<'_> { // if notify_waiters has been called after the future // was created, then we are done - if get_num_notify_waiters_calls(curr) != initial_notify_waiters_calls { + if get_num_notify_waiters_calls(curr) != *notify_waiters_calls { *state = Done; - return Poll::Ready(()); + continue 'outer_loop; } // Transition the state to WAITING. @@ -817,7 +954,7 @@ impl Notified<'_> { Ok(_) => { // Acquired the notification *state = Done; - return Poll::Ready(()); + continue 'outer_loop; } Err(actual) => { assert_eq!(get_state(actual), EMPTY); @@ -829,52 +966,109 @@ impl Notified<'_> { } } + let mut old_waker = None; if waker.is_some() { // Safety: called while locked. + // + // The use of `old_waiter` here is not necessary, as the field is always + // None when we reach this line. unsafe { - (*waiter.get()).waker = waker; + old_waker = + waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker)); } } // Insert the waiter into the linked list - // - // safety: pointers from `UnsafeCell` are never null. - waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); + waiters.push_front(NonNull::from(waiter)); *state = Waiting; + drop(waiters); + drop(old_waker); + return Poll::Pending; } Waiting => { - // Currently in the "Waiting" state, implying the caller has - // a waiter stored in the waiter list (guarded by - // `notify.waiters`). In order to access the waker fields, - // we must hold the lock. + #[cfg(tokio_taskdump)] + if let Some(waker) = waker { + let mut ctx = Context::from_waker(waker); + ready!(crate::trace::trace_leaf(&mut ctx)); + } + + if waiter.notification.load(Acquire).is_some() { + // Safety: waiter is already unlinked and will not be shared again, + // so we have an exclusive access to `waker`. + drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }); - let waiters = notify.waiters.lock(); + waiter.notification.clear(); + *state = Done; + return Poll::Ready(()); + } - // Safety: called while locked - let w = unsafe { &mut *waiter.get() }; + // Our waiter was not notified, implying it is still stored in a waiter + // list (guarded by `notify.waiters`). In order to access the waker + // fields, we must acquire the lock. + + let mut old_waker = None; + let mut waiters = notify.waiters.lock(); - if w.notified.is_some() { - // Our waker has been notified. Reset the fields and - // remove it from the list. - w.waker = None; - w.notified = None; + // We hold the lock and notifications are set only with the lock held, + // so this can be relaxed, because the happens-before relationship is + // established through the mutex. + if waiter.notification.load(Relaxed).is_some() { + // Safety: waiter is already unlinked and will not be shared again, + // so we have an exclusive access to `waker`. + old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; + + waiter.notification.clear(); + + // Drop the old waker after releasing the lock. + drop(waiters); + drop(old_waker); + + *state = Done; + return Poll::Ready(()); + } + + // Load the state with the lock held. + let curr = notify.state.load(SeqCst); + + if get_num_notify_waiters_calls(curr) != *notify_waiters_calls { + // Before we add a waiter to the list we check if these numbers are + // different while holding the lock. If these numbers are different now, + // it means that there is a call to `notify_waiters` in progress and this + // waiter must be contained by a guarded list used in `notify_waiters`. + // We can treat the waiter as notified and remove it from the list, as + // it would have been notified in the `notify_waiters` call anyways. + + // Safety: we hold the lock, so we can modify the waker. + old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }; + + // Safety: we hold the lock, so we have an exclusive access to the list. + // The list is used in `notify_waiters`, so it must be guarded. + unsafe { waiters.remove(NonNull::from(waiter)) }; *state = Done; } else { - // Update the waker, if necessary. - if let Some(waker) = waker { - let should_update = match w.waker.as_ref() { - Some(current_waker) => !current_waker.will_wake(waker), - None => true, - }; - if should_update { - w.waker = Some(waker.clone()); - } + // Safety: we hold the lock, so we can modify the waker. + unsafe { + waiter.waker.with_mut(|v| { + if let Some(waker) = waker { + let should_update = match &*v { + Some(current_waker) => !current_waker.will_wake(waker), + None => true, + }; + if should_update { + old_waker = std::mem::replace(&mut *v, Some(waker.clone())); + } + } + }); } + // Drop the old waker after releasing the lock. + drop(waiters); + drop(old_waker); + return Poll::Pending; } @@ -884,8 +1078,16 @@ impl Notified<'_> { // is helpful to visualize the scope of the critical // section. drop(waiters); + + // Drop the old waker after releasing the lock. + drop(old_waker); } Done => { + #[cfg(tokio_taskdump)] + if let Some(waker) = waker { + let mut ctx = Context::from_waker(waker); + ready!(crate::trace::trace_leaf(&mut ctx)); + } return Poll::Ready(()); } } @@ -906,7 +1108,7 @@ impl Drop for Notified<'_> { use State::*; // Safety: The type only transitions to a "Waiting" state when pinned. - let (notify, state, waiter) = unsafe { Pin::new_unchecked(self).project() }; + let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() }; // This is where we ensure safety. The `Notified` value is being // dropped, which means we must ensure that the waiter entry is no @@ -915,11 +1117,16 @@ impl Drop for Notified<'_> { let mut waiters = notify.waiters.lock(); let mut notify_state = notify.state.load(SeqCst); + // We hold the lock, so this field is not concurrently accessed by + // `notify_*` functions and we can use the relaxed ordering. + let notification = waiter.notification.load(Relaxed); + // remove the entry from the list (if not already removed) // - // safety: the waiter is only added to `waiters` by virtue of it - // being the only `LinkedList` available to the type. - unsafe { waiters.remove(NonNull::new_unchecked(waiter.get())) }; + // Safety: we hold the lock, so we have an exclusive access to every list the + // waiter may be contained in. If the node is not contained in the `waiters` + // list, then it is contained by a guarded list used by `notify_waiters`. + unsafe { waiters.remove(NonNull::from(waiter)) }; if waiters.is_empty() && get_state(notify_state) == WAITING { notify_state = set_state(notify_state, EMPTY); @@ -929,13 +1136,7 @@ impl Drop for Notified<'_> { // See if the node was notified but not received. In this case, if // the notification was triggered via `notify_one`, it must be sent // to the next waiter. - // - // Safety: with the entry removed from the linked list, there can be - // no concurrent access to the entry - if matches!( - unsafe { (*waiter.get()).notified }, - Some(NotificationType::OneWaiter) - ) { + if notification == Some(Notification::One) { if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { drop(waiters); waker.wake(); |