diff options
Diffstat (limited to 'src/sync/notify.rs')
-rw-r--r-- | src/sync/notify.rs | 152 |
1 files changed, 116 insertions, 36 deletions
diff --git a/src/sync/notify.rs b/src/sync/notify.rs index 5cb41e8..922f109 100644 --- a/src/sync/notify.rs +++ b/src/sync/notify.rs @@ -1,3 +1,10 @@ +// Allow `unreachable_pub` warnings when sync is not enabled +// due to the usage of `Notify` within the `rt` feature set. +// When this module is compiled with `sync` enabled we will warn on +// this lint. When `rt` is enabled we use `pub(crate)` which +// triggers this warning but it is safe to ignore in this case. +#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] + use crate::loom::sync::atomic::AtomicU8; use crate::loom::sync::Mutex; use crate::util::linked_list::{self, LinkedList}; @@ -10,6 +17,8 @@ use std::ptr::NonNull; use std::sync::atomic::Ordering::SeqCst; use std::task::{Context, Poll, Waker}; +type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; + /// Notify a single task to wake up. /// /// `Notify` provides a basic mechanism to notify a single task of an event. @@ -17,20 +26,20 @@ use std::task::{Context, Poll, Waker}; /// another task to perform an operation. /// /// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. -/// [`notified().await`] waits for a permit to become available, and [`notify()`] +/// [`notified().await`] waits for a permit to become available, and [`notify_one()`] /// sets a permit **if there currently are no available permits**. /// /// The synchronization details of `Notify` are similar to /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`] /// value contains a single permit. [`notified().await`] waits for the permit to -/// be made available, consumes the permit, and resumes. [`notify()`] sets the +/// be made available, consumes the permit, and resumes. [`notify_one()`] sets the /// permit, waking a pending task if there is one. /// -/// If `notify()` is called **before** `notfied().await`, then the next call to +/// If `notify_one()` is called **before** `notified().await`, then the next call to /// `notified().await` will complete immediately, consuming the permit. Any /// subsequent calls to `notified().await` will wait for a new permit. /// -/// If `notify()` is called **multiple** times before `notified().await`, only a +/// If `notify_one()` is called **multiple** times before `notified().await`, only a /// **single** permit is stored. The next call to `notified().await` will /// complete immediately, but the one after will wait for a new permit. /// @@ -53,7 +62,7 @@ use std::task::{Context, Poll, Waker}; /// }); /// /// println!("sending notification"); -/// notify.notify(); +/// notify.notify_one(); /// } /// ``` /// @@ -76,7 +85,7 @@ use std::task::{Context, Poll, Waker}; /// .push_back(value); /// /// // Notify the consumer a value is available -/// self.notify.notify(); +/// self.notify.notify_one(); /// } /// /// pub async fn recv(&self) -> T { @@ -96,12 +105,20 @@ use std::task::{Context, Poll, Waker}; /// [park]: std::thread::park /// [unpark]: std::thread::Thread::unpark /// [`notified().await`]: Notify::notified() -/// [`notify()`]: Notify::notify() +/// [`notify_one()`]: Notify::notify_one() /// [`Semaphore`]: crate::sync::Semaphore #[derive(Debug)] pub struct Notify { state: AtomicU8, - waiters: Mutex<LinkedList<Waiter>>, + waiters: Mutex<WaitList>, +} + +#[derive(Debug, Clone, Copy)] +enum NotificationType { + // Notification triggered by calling `notify_waiters` + AllWaiters, + // Notification triggered by calling `notify_one` + OneWaiter, } #[derive(Debug)] @@ -113,7 +130,7 @@ struct Waiter { waker: Option<Waker>, /// `true` if the notification has been assigned to this waiter. - notified: bool, + notified: Option<NotificationType>, /// Should not be `Unpin`. _p: PhantomPinned, @@ -121,7 +138,7 @@ struct Waiter { /// Future returned from `notified()` #[derive(Debug)] -struct Notified<'a> { +pub struct Notified<'a> { /// The `Notify` being received on. notify: &'a Notify, @@ -168,14 +185,38 @@ impl Notify { } } + /// Create a new `Notify`, initialized without a permit. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Notify; + /// + /// static NOTIFY: Notify = Notify::const_new(); + /// ``` + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn const_new() -> Notify { + Notify { + state: AtomicU8::new(0), + waiters: Mutex::const_new(LinkedList::new()), + } + } + /// Wait for a notification. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn notified(&self); + /// ``` + /// /// Each `Notify` value holds a single permit. If a permit is available from - /// an earlier call to [`notify()`], then `notified().await` will complete + /// an earlier call to [`notify_one()`], then `notified().await` will complete /// immediately, consuming that permit. Otherwise, `notified().await` waits - /// for a permit to be made available by the next call to `notify()`. + /// for a permit to be made available by the next call to `notify_one()`. /// - /// [`notify()`]: Notify::notify + /// [`notify_one()`]: Notify::notify_one /// /// # Examples /// @@ -194,21 +235,20 @@ impl Notify { /// }); /// /// println!("sending notification"); - /// notify.notify(); + /// notify.notify_one(); /// } /// ``` - pub async fn notified(&self) { + pub fn notified(&self) -> Notified<'_> { Notified { notify: self, state: State::Init, waiter: UnsafeCell::new(Waiter { pointers: linked_list::Pointers::new(), waker: None, - notified: false, + notified: None, _p: PhantomPinned, }), } - .await } /// Notifies a waiting task @@ -216,10 +256,10 @@ impl Notify { /// If a task is currently waiting, that task is notified. Otherwise, a /// permit is stored in this `Notify` value and the **next** call to /// [`notified().await`] will complete immediately consuming the permit made - /// available by this call to `notify()`. + /// available by this call to `notify_one()`. /// /// At most one permit may be stored by `Notify`. Many sequential calls to - /// `notify` will result in a single permit being stored. The next call to + /// `notify_one` will result in a single permit being stored. The next call to /// `notified().await` will complete immediately, but the one after that /// will wait. /// @@ -242,10 +282,10 @@ impl Notify { /// }); /// /// println!("sending notification"); - /// notify.notify(); + /// notify.notify_one(); /// } /// ``` - pub fn notify(&self) { + pub fn notify_one(&self) { // Load the current state let mut curr = self.state.load(SeqCst); @@ -266,7 +306,7 @@ impl Notify { } // There are waiters, the lock must be acquired to notify. - let mut waiters = self.waiters.lock().unwrap(); + let mut waiters = self.waiters.lock(); // The state must be reloaded while the lock is held. The state may only // transition out of WAITING while the lock is held. @@ -277,6 +317,45 @@ impl Notify { waker.wake(); } } + + /// Notifies all waiting tasks + pub(crate) fn notify_waiters(&self) { + // There are waiters, the lock must be acquired to notify. + let mut waiters = self.waiters.lock(); + + // The state must be reloaded while the lock is held. The state may only + // transition out of WAITING while the lock is held. + let curr = self.state.load(SeqCst); + + if let EMPTY | NOTIFIED = curr { + // There are no waiting tasks. In this case, no synchronization is + // established between `notify` and `notified().await`. + return; + } + + // At this point, it is guaranteed that the state will not + // concurrently change, as holding the lock is required to + // transition **out** of `WAITING`. + // + // Get pending waiters + while let Some(mut waiter) = waiters.pop_back() { + // Safety: `waiters` lock is still held. + let waiter = unsafe { waiter.as_mut() }; + + assert!(waiter.notified.is_none()); + + waiter.notified = Some(NotificationType::AllWaiters); + + if let Some(waker) = waiter.waker.take() { + waker.wake(); + } + } + + // All waiters have been notified, the state must be transitioned to + // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be + // held, a `store` is sufficient. + self.state.store(EMPTY, SeqCst); + } } impl Default for Notify { @@ -285,7 +364,7 @@ impl Default for Notify { } } -fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) -> Option<Waker> { +fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option<Waker> { loop { match curr { EMPTY | NOTIFIED => { @@ -311,9 +390,9 @@ fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) - // Safety: `waiters` lock is still held. let waiter = unsafe { waiter.as_mut() }; - assert!(!waiter.notified); + assert!(waiter.notified.is_none()); - waiter.notified = true; + waiter.notified = Some(NotificationType::OneWaiter); let waker = waiter.waker.take(); if waiters.is_empty() { @@ -373,7 +452,7 @@ impl Future for Notified<'_> { // Acquire the lock and attempt to transition to the waiting // state. - let mut waiters = notify.waiters.lock().unwrap(); + let mut waiters = notify.waiters.lock(); // Reload the state with the lock held let mut curr = notify.state.load(SeqCst); @@ -428,6 +507,8 @@ impl Future for Notified<'_> { waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); *state = Waiting; + + return Poll::Pending; } Waiting => { // Currently in the "Waiting" state, implying the caller has @@ -435,16 +516,16 @@ impl Future for Notified<'_> { // `notify.waiters`). In order to access the waker fields, // we must hold the lock. - let waiters = notify.waiters.lock().unwrap(); + let waiters = notify.waiters.lock(); // Safety: called while locked let w = unsafe { &mut *waiter.get() }; - if w.notified { + if w.notified.is_some() { // Our waker has been notified. Reset the fields and // remove it from the list. w.waker = None; - w.notified = false; + w.notified = None; *state = Done; } else { @@ -483,12 +564,12 @@ impl Drop for Notified<'_> { // longer stored in the linked list. if let Waiting = *state { let mut notify_state = WAITING; - let mut waiters = notify.waiters.lock().unwrap(); + let mut waiters = notify.waiters.lock(); // `Notify.state` may be in any of the three states (Empty, Waiting, // Notified). It doesn't actually matter what the atomic is set to // at this point. We hold the lock and will ensure the atomic is in - // the correct state once th elock is dropped. + // the correct state once the lock is dropped. // // Because the atomic state is not checked, at first glance, it may // seem like this routine does not handle the case where the @@ -516,14 +597,13 @@ impl Drop for Notified<'_> { notify.state.store(EMPTY, SeqCst); } - // See if the node was notified but not received. In this case, the - // notification must be sent to another waiter. + // See if the node was notified but not received. In this case, if + // the notification was triggered via `notify_one`, it must be sent + // to the next waiter. // // Safety: with the entry removed from the linked list, there can be // no concurrent access to the entry - let notified = unsafe { (*waiter.get()).notified }; - - if notified { + if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } { if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { drop(waiters); waker.wake(); |