aboutsummaryrefslogtreecommitdiff
path: root/src/sync/notify.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/notify.rs')
-rw-r--r--src/sync/notify.rs409
1 files changed, 305 insertions, 104 deletions
diff --git a/src/sync/notify.rs b/src/sync/notify.rs
index efe16f9..bf00ca3 100644
--- a/src/sync/notify.rs
+++ b/src/sync/notify.rs
@@ -5,21 +5,22 @@
// triggers this warning but it is safe to ignore in this case.
#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
+use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
-use crate::util::linked_list::{self, LinkedList};
+use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
use crate::util::WakeList;
-use std::cell::UnsafeCell;
use std::future::Future;
use std::marker::PhantomPinned;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::ptr::NonNull;
-use std::sync::atomic::Ordering::SeqCst;
+use std::sync::atomic::Ordering::{self, Acquire, Relaxed, Release, SeqCst};
use std::task::{Context, Poll, Waker};
type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
+type GuardedWaitList = GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// Notifies a single task to wake up.
///
@@ -198,37 +199,51 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// [`Semaphore`]: crate::sync::Semaphore
#[derive(Debug)]
pub struct Notify {
- // This uses 2 bits to store one of `EMPTY`,
+ // `state` uses 2 bits to store one of `EMPTY`,
// `WAITING` or `NOTIFIED`. The rest of the bits
// are used to store the number of times `notify_waiters`
// was called.
+ //
+ // Throughout the code there are two assumptions:
+ // - state can be transitioned *from* `WAITING` only if
+ // `waiters` lock is held
+ // - number of times `notify_waiters` was called can
+ // be modified only if `waiters` lock is held
state: AtomicUsize,
waiters: Mutex<WaitList>,
}
-#[derive(Debug, Clone, Copy)]
-enum NotificationType {
- // Notification triggered by calling `notify_waiters`
- AllWaiters,
- // Notification triggered by calling `notify_one`
- OneWaiter,
-}
-
#[derive(Debug)]
struct Waiter {
/// Intrusive linked-list pointers.
pointers: linked_list::Pointers<Waiter>,
- /// Waiting task's waker.
- waker: Option<Waker>,
+ /// Waiting task's waker. Depending on the value of `notification`,
+ /// this field is either protected by the `waiters` lock in
+ /// `Notify`, or it is exclusively owned by the enclosing `Waiter`.
+ waker: UnsafeCell<Option<Waker>>,
- /// `true` if the notification has been assigned to this waiter.
- notified: Option<NotificationType>,
+ /// Notification for this waiter.
+ /// * if it's `None`, then `waker` is protected by the `waiters` lock.
+ /// * if it's `Some`, then `waker` is exclusively owned by the
+ /// enclosing `Waiter` and can be accessed without locking.
+ notification: AtomicNotification,
/// Should not be `Unpin`.
_p: PhantomPinned,
}
+impl Waiter {
+ fn new() -> Waiter {
+ Waiter {
+ pointers: linked_list::Pointers::new(),
+ waker: UnsafeCell::new(None),
+ notification: AtomicNotification::none(),
+ _p: PhantomPinned,
+ }
+ }
+}
+
generate_addr_of_methods! {
impl<> Waiter {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
@@ -237,6 +252,109 @@ generate_addr_of_methods! {
}
}
+// No notification.
+const NOTIFICATION_NONE: usize = 0;
+
+// Notification type used by `notify_one`.
+const NOTIFICATION_ONE: usize = 1;
+
+// Notification type used by `notify_waiters`.
+const NOTIFICATION_ALL: usize = 2;
+
+/// Notification for a `Waiter`.
+/// This struct is equivalent to `Option<Notification>`, but uses
+/// `AtomicUsize` inside for atomic operations.
+#[derive(Debug)]
+struct AtomicNotification(AtomicUsize);
+
+impl AtomicNotification {
+ fn none() -> Self {
+ AtomicNotification(AtomicUsize::new(NOTIFICATION_NONE))
+ }
+
+ /// Store-release a notification.
+ /// This method should be called exactly once.
+ fn store_release(&self, notification: Notification) {
+ self.0.store(notification as usize, Release);
+ }
+
+ fn load(&self, ordering: Ordering) -> Option<Notification> {
+ match self.0.load(ordering) {
+ NOTIFICATION_NONE => None,
+ NOTIFICATION_ONE => Some(Notification::One),
+ NOTIFICATION_ALL => Some(Notification::All),
+ _ => unreachable!(),
+ }
+ }
+
+ /// Clears the notification.
+ /// This method is used by a `Notified` future to consume the
+ /// notification. It uses relaxed ordering and should be only
+ /// used once the atomic notification is no longer shared.
+ fn clear(&self) {
+ self.0.store(NOTIFICATION_NONE, Relaxed);
+ }
+}
+
+#[derive(Debug, PartialEq, Eq)]
+#[repr(usize)]
+enum Notification {
+ One = NOTIFICATION_ONE,
+ All = NOTIFICATION_ALL,
+}
+
+/// List used in `Notify::notify_waiters`. It wraps a guarded linked list
+/// and gates the access to it on `notify.waiters` mutex. It also empties
+/// the list on drop.
+struct NotifyWaitersList<'a> {
+ list: GuardedWaitList,
+ is_empty: bool,
+ notify: &'a Notify,
+}
+
+impl<'a> NotifyWaitersList<'a> {
+ fn new(
+ unguarded_list: WaitList,
+ guard: Pin<&'a Waiter>,
+ notify: &'a Notify,
+ ) -> NotifyWaitersList<'a> {
+ let guard_ptr = NonNull::from(guard.get_ref());
+ let list = unguarded_list.into_guarded(guard_ptr);
+ NotifyWaitersList {
+ list,
+ is_empty: false,
+ notify,
+ }
+ }
+
+ /// Removes the last element from the guarded list. Modifying this list
+ /// requires an exclusive access to the main list in `Notify`.
+ fn pop_back_locked(&mut self, _waiters: &mut WaitList) -> Option<NonNull<Waiter>> {
+ let result = self.list.pop_back();
+ if result.is_none() {
+ // Save information about emptiness to avoid waiting for lock
+ // in the destructor.
+ self.is_empty = true;
+ }
+ result
+ }
+}
+
+impl Drop for NotifyWaitersList<'_> {
+ fn drop(&mut self) {
+ // If the list is not empty, we unlink all waiters from it.
+ // We do not wake the waiters to avoid double panics.
+ if !self.is_empty {
+ let _lock_guard = self.notify.waiters.lock();
+ while let Some(waiter) = self.list.pop_back() {
+ // Safety: we never make mutable references to waiters.
+ let waiter = unsafe { waiter.as_ref() };
+ waiter.notification.store_release(Notification::All);
+ }
+ }
+ }
+}
+
/// Future returned from [`Notify::notified()`].
///
/// This future is fused, so once it has completed, any future calls to poll
@@ -249,8 +367,11 @@ pub struct Notified<'a> {
/// The current state of the receiving process.
state: State,
+ /// Number of calls to `notify_waiters` at the time of creation.
+ notify_waiters_calls: usize,
+
/// Entry in the waiter `LinkedList`.
- waiter: UnsafeCell<Waiter>,
+ waiter: Waiter,
}
unsafe impl<'a> Send for Notified<'a> {}
@@ -258,7 +379,7 @@ unsafe impl<'a> Sync for Notified<'a> {}
#[derive(Debug)]
enum State {
- Init(usize),
+ Init,
Waiting,
Done,
}
@@ -322,8 +443,7 @@ impl Notify {
///
/// static NOTIFY: Notify = Notify::const_new();
/// ```
- #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
- #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
+ #[cfg(not(all(loom, test)))]
pub const fn const_new() -> Notify {
Notify {
state: AtomicUsize::new(0),
@@ -383,17 +503,13 @@ impl Notify {
/// ```
pub fn notified(&self) -> Notified<'_> {
// we load the number of times notify_waiters
- // was called and store that in our initial state
+ // was called and store that in the future.
let state = self.state.load(SeqCst);
Notified {
notify: self,
- state: State::Init(state >> NOTIFY_WAITERS_SHIFT),
- waiter: UnsafeCell::new(Waiter {
- pointers: linked_list::Pointers::new(),
- waker: None,
- notified: None,
- _p: PhantomPinned,
- }),
+ state: State::Init,
+ notify_waiters_calls: get_num_notify_waiters_calls(state),
+ waiter: Waiter::new(),
}
}
@@ -500,12 +616,9 @@ impl Notify {
/// }
/// ```
pub fn notify_waiters(&self) {
- let mut wakers = WakeList::new();
-
- // There are waiters, the lock must be acquired to notify.
let mut waiters = self.waiters.lock();
- // The state must be reloaded while the lock is held. The state may only
+ // The state must be loaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
let curr = self.state.load(SeqCst);
@@ -516,23 +629,43 @@ impl Notify {
return;
}
- // At this point, it is guaranteed that the state will not
- // concurrently change, as holding the lock is required to
- // transition **out** of `WAITING`.
+ // Increment the number of times this method was called
+ // and transition to empty.
+ let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
+ self.state.store(new_state, SeqCst);
+
+ // It is critical for `GuardedLinkedList` safety that the guard node is
+ // pinned in memory and is not dropped until the guarded list is dropped.
+ let guard = Waiter::new();
+ pin!(guard);
+
+ // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
+ // underneath to allow every waiter to safely remove itself from it.
+ //
+ // * This list will be still guarded by the `waiters` lock.
+ // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
+ // * This wrapper will empty the list on drop. It is critical for safety
+ // that we will not leave any list entry with a pointer to the local
+ // guard node after this function returns / panics.
+ let mut list = NotifyWaitersList::new(std::mem::take(&mut *waiters), guard.as_ref(), self);
+
+ let mut wakers = WakeList::new();
'outer: loop {
while wakers.can_push() {
- match waiters.pop_back() {
- Some(mut waiter) => {
- // Safety: `waiters` lock is still held.
- let waiter = unsafe { waiter.as_mut() };
-
- assert!(waiter.notified.is_none());
-
- waiter.notified = Some(NotificationType::AllWaiters);
-
- if let Some(waker) = waiter.waker.take() {
+ match list.pop_back_locked(&mut waiters) {
+ Some(waiter) => {
+ // Safety: we never make mutable references to waiters.
+ let waiter = unsafe { waiter.as_ref() };
+
+ // Safety: we hold the lock, so we can access the waker.
+ if let Some(waker) =
+ unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }
+ {
wakers.push(waker);
}
+
+ // This waiter is unlinked and will not be shared ever again, release it.
+ waiter.notification.store_release(Notification::All);
}
None => {
break 'outer;
@@ -540,20 +673,17 @@ impl Notify {
}
}
+ // Release the lock before notifying.
drop(waiters);
+ // One of the wakers may panic, but the remaining waiters will still
+ // be unlinked from the list in `NotifyWaitersList` destructor.
wakers.wake_all();
// Acquire the lock again.
waiters = self.waiters.lock();
}
- // All waiters will be notified, the state must be transitioned to
- // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be
- // held, a `store` is sufficient.
- let new = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
- self.state.store(new, SeqCst);
-
// Release the lock before notifying
drop(waiters);
@@ -592,15 +722,16 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
// transition **out** of `WAITING`.
//
// Get a pending waiter
- let mut waiter = waiters.pop_back().unwrap();
+ let waiter = waiters.pop_back().unwrap();
- // Safety: `waiters` lock is still held.
- let waiter = unsafe { waiter.as_mut() };
+ // Safety: we never make mutable references to waiters.
+ let waiter = unsafe { waiter.as_ref() };
- assert!(waiter.notified.is_none());
+ // Safety: we hold the lock, so we can access the waker.
+ let waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
- waiter.notified = Some(NotificationType::OneWaiter);
- let waker = waiter.waker.take();
+ // This waiter is unlinked and will not be shared ever again, release it.
+ waiter.notification.store_release(Notification::One);
if waiters.is_empty() {
// As this the **final** waiter in the list, the state
@@ -730,26 +861,32 @@ impl Notified<'_> {
/// A custom `project` implementation is used in place of `pin-project-lite`
/// as a custom drop implementation is needed.
- fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>) {
+ fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &usize, &Waiter) {
unsafe {
- // Safety: both `notify` and `state` are `Unpin`.
+ // Safety: `notify`, `state` and `notify_waiters_calls` are `Unpin`.
is_unpin::<&Notify>();
- is_unpin::<AtomicUsize>();
+ is_unpin::<State>();
+ is_unpin::<usize>();
let me = self.get_unchecked_mut();
- (me.notify, &mut me.state, &me.waiter)
+ (
+ me.notify,
+ &mut me.state,
+ &me.notify_waiters_calls,
+ &me.waiter,
+ )
}
}
fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
use State::*;
- let (notify, state, waiter) = self.project();
+ let (notify, state, notify_waiters_calls, waiter) = self.project();
- loop {
+ 'outer_loop: loop {
match *state {
- Init(initial_notify_waiters_calls) => {
+ Init => {
let curr = notify.state.load(SeqCst);
// Optimistically try acquiring a pending notification
@@ -763,7 +900,7 @@ impl Notified<'_> {
if res.is_ok() {
// Acquired the notification
*state = Done;
- return Poll::Ready(());
+ continue 'outer_loop;
}
// Clone the waker before locking, a waker clone can be
@@ -779,9 +916,9 @@ impl Notified<'_> {
// if notify_waiters has been called after the future
// was created, then we are done
- if get_num_notify_waiters_calls(curr) != initial_notify_waiters_calls {
+ if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
*state = Done;
- return Poll::Ready(());
+ continue 'outer_loop;
}
// Transition the state to WAITING.
@@ -817,7 +954,7 @@ impl Notified<'_> {
Ok(_) => {
// Acquired the notification
*state = Done;
- return Poll::Ready(());
+ continue 'outer_loop;
}
Err(actual) => {
assert_eq!(get_state(actual), EMPTY);
@@ -829,52 +966,109 @@ impl Notified<'_> {
}
}
+ let mut old_waker = None;
if waker.is_some() {
// Safety: called while locked.
+ //
+ // The use of `old_waiter` here is not necessary, as the field is always
+ // None when we reach this line.
unsafe {
- (*waiter.get()).waker = waker;
+ old_waker =
+ waiter.waker.with_mut(|v| std::mem::replace(&mut *v, waker));
}
}
// Insert the waiter into the linked list
- //
- // safety: pointers from `UnsafeCell` are never null.
- waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
+ waiters.push_front(NonNull::from(waiter));
*state = Waiting;
+ drop(waiters);
+ drop(old_waker);
+
return Poll::Pending;
}
Waiting => {
- // Currently in the "Waiting" state, implying the caller has
- // a waiter stored in the waiter list (guarded by
- // `notify.waiters`). In order to access the waker fields,
- // we must hold the lock.
+ #[cfg(tokio_taskdump)]
+ if let Some(waker) = waker {
+ let mut ctx = Context::from_waker(waker);
+ ready!(crate::trace::trace_leaf(&mut ctx));
+ }
+
+ if waiter.notification.load(Acquire).is_some() {
+ // Safety: waiter is already unlinked and will not be shared again,
+ // so we have an exclusive access to `waker`.
+ drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
- let waiters = notify.waiters.lock();
+ waiter.notification.clear();
+ *state = Done;
+ return Poll::Ready(());
+ }
- // Safety: called while locked
- let w = unsafe { &mut *waiter.get() };
+ // Our waiter was not notified, implying it is still stored in a waiter
+ // list (guarded by `notify.waiters`). In order to access the waker
+ // fields, we must acquire the lock.
+
+ let mut old_waker = None;
+ let mut waiters = notify.waiters.lock();
- if w.notified.is_some() {
- // Our waker has been notified. Reset the fields and
- // remove it from the list.
- w.waker = None;
- w.notified = None;
+ // We hold the lock and notifications are set only with the lock held,
+ // so this can be relaxed, because the happens-before relationship is
+ // established through the mutex.
+ if waiter.notification.load(Relaxed).is_some() {
+ // Safety: waiter is already unlinked and will not be shared again,
+ // so we have an exclusive access to `waker`.
+ old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
+
+ waiter.notification.clear();
+
+ // Drop the old waker after releasing the lock.
+ drop(waiters);
+ drop(old_waker);
+
+ *state = Done;
+ return Poll::Ready(());
+ }
+
+ // Load the state with the lock held.
+ let curr = notify.state.load(SeqCst);
+
+ if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
+ // Before we add a waiter to the list we check if these numbers are
+ // different while holding the lock. If these numbers are different now,
+ // it means that there is a call to `notify_waiters` in progress and this
+ // waiter must be contained by a guarded list used in `notify_waiters`.
+ // We can treat the waiter as notified and remove it from the list, as
+ // it would have been notified in the `notify_waiters` call anyways.
+
+ // Safety: we hold the lock, so we can modify the waker.
+ old_waker = unsafe { waiter.waker.with_mut(|waker| (*waker).take()) };
+
+ // Safety: we hold the lock, so we have an exclusive access to the list.
+ // The list is used in `notify_waiters`, so it must be guarded.
+ unsafe { waiters.remove(NonNull::from(waiter)) };
*state = Done;
} else {
- // Update the waker, if necessary.
- if let Some(waker) = waker {
- let should_update = match w.waker.as_ref() {
- Some(current_waker) => !current_waker.will_wake(waker),
- None => true,
- };
- if should_update {
- w.waker = Some(waker.clone());
- }
+ // Safety: we hold the lock, so we can modify the waker.
+ unsafe {
+ waiter.waker.with_mut(|v| {
+ if let Some(waker) = waker {
+ let should_update = match &*v {
+ Some(current_waker) => !current_waker.will_wake(waker),
+ None => true,
+ };
+ if should_update {
+ old_waker = std::mem::replace(&mut *v, Some(waker.clone()));
+ }
+ }
+ });
}
+ // Drop the old waker after releasing the lock.
+ drop(waiters);
+ drop(old_waker);
+
return Poll::Pending;
}
@@ -884,8 +1078,16 @@ impl Notified<'_> {
// is helpful to visualize the scope of the critical
// section.
drop(waiters);
+
+ // Drop the old waker after releasing the lock.
+ drop(old_waker);
}
Done => {
+ #[cfg(tokio_taskdump)]
+ if let Some(waker) = waker {
+ let mut ctx = Context::from_waker(waker);
+ ready!(crate::trace::trace_leaf(&mut ctx));
+ }
return Poll::Ready(());
}
}
@@ -906,7 +1108,7 @@ impl Drop for Notified<'_> {
use State::*;
// Safety: The type only transitions to a "Waiting" state when pinned.
- let (notify, state, waiter) = unsafe { Pin::new_unchecked(self).project() };
+ let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() };
// This is where we ensure safety. The `Notified` value is being
// dropped, which means we must ensure that the waiter entry is no
@@ -915,11 +1117,16 @@ impl Drop for Notified<'_> {
let mut waiters = notify.waiters.lock();
let mut notify_state = notify.state.load(SeqCst);
+ // We hold the lock, so this field is not concurrently accessed by
+ // `notify_*` functions and we can use the relaxed ordering.
+ let notification = waiter.notification.load(Relaxed);
+
// remove the entry from the list (if not already removed)
//
- // safety: the waiter is only added to `waiters` by virtue of it
- // being the only `LinkedList` available to the type.
- unsafe { waiters.remove(NonNull::new_unchecked(waiter.get())) };
+ // Safety: we hold the lock, so we have an exclusive access to every list the
+ // waiter may be contained in. If the node is not contained in the `waiters`
+ // list, then it is contained by a guarded list used by `notify_waiters`.
+ unsafe { waiters.remove(NonNull::from(waiter)) };
if waiters.is_empty() && get_state(notify_state) == WAITING {
notify_state = set_state(notify_state, EMPTY);
@@ -929,13 +1136,7 @@ impl Drop for Notified<'_> {
// See if the node was notified but not received. In this case, if
// the notification was triggered via `notify_one`, it must be sent
// to the next waiter.
- //
- // Safety: with the entry removed from the linked list, there can be
- // no concurrent access to the entry
- if matches!(
- unsafe { (*waiter.get()).notified },
- Some(NotificationType::OneWaiter)
- ) {
+ if notification == Some(Notification::One) {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
drop(waiters);
waker.wake();