aboutsummaryrefslogtreecommitdiff
path: root/src/sync/notify.rs
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2020-10-23 09:39:31 -0700
committerJoel Galenson <jgalenson@google.com>2020-10-23 09:52:09 -0700
commitd5495b03381a3ebe0805db353d198b285b535b5c (patch)
tree778b8524d15fca8b73db0253ee0e1919d0848bb6 /src/sync/notify.rs
parentba45c5bedf31df8562364c61d3dfb5262f10642e (diff)
downloadtokio-d5495b03381a3ebe0805db353d198b285b535b5c.tar.gz
Update to tokio-0.3.1 and add new features
Test: Build Change-Id: I5b5b9b386a21982a019653d0cf0bd3afc505cfac
Diffstat (limited to 'src/sync/notify.rs')
-rw-r--r--src/sync/notify.rs152
1 files changed, 116 insertions, 36 deletions
diff --git a/src/sync/notify.rs b/src/sync/notify.rs
index 5cb41e8..922f109 100644
--- a/src/sync/notify.rs
+++ b/src/sync/notify.rs
@@ -1,3 +1,10 @@
+// Allow `unreachable_pub` warnings when sync is not enabled
+// due to the usage of `Notify` within the `rt` feature set.
+// When this module is compiled with `sync` enabled we will warn on
+// this lint. When `rt` is enabled we use `pub(crate)` which
+// triggers this warning but it is safe to ignore in this case.
+#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
+
use crate::loom::sync::atomic::AtomicU8;
use crate::loom::sync::Mutex;
use crate::util::linked_list::{self, LinkedList};
@@ -10,6 +17,8 @@ use std::ptr::NonNull;
use std::sync::atomic::Ordering::SeqCst;
use std::task::{Context, Poll, Waker};
+type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
+
/// Notify a single task to wake up.
///
/// `Notify` provides a basic mechanism to notify a single task of an event.
@@ -17,20 +26,20 @@ use std::task::{Context, Poll, Waker};
/// another task to perform an operation.
///
/// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits.
-/// [`notified().await`] waits for a permit to become available, and [`notify()`]
+/// [`notified().await`] waits for a permit to become available, and [`notify_one()`]
/// sets a permit **if there currently are no available permits**.
///
/// The synchronization details of `Notify` are similar to
/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
/// value contains a single permit. [`notified().await`] waits for the permit to
-/// be made available, consumes the permit, and resumes. [`notify()`] sets the
+/// be made available, consumes the permit, and resumes. [`notify_one()`] sets the
/// permit, waking a pending task if there is one.
///
-/// If `notify()` is called **before** `notfied().await`, then the next call to
+/// If `notify_one()` is called **before** `notified().await`, then the next call to
/// `notified().await` will complete immediately, consuming the permit. Any
/// subsequent calls to `notified().await` will wait for a new permit.
///
-/// If `notify()` is called **multiple** times before `notified().await`, only a
+/// If `notify_one()` is called **multiple** times before `notified().await`, only a
/// **single** permit is stored. The next call to `notified().await` will
/// complete immediately, but the one after will wait for a new permit.
///
@@ -53,7 +62,7 @@ use std::task::{Context, Poll, Waker};
/// });
///
/// println!("sending notification");
-/// notify.notify();
+/// notify.notify_one();
/// }
/// ```
///
@@ -76,7 +85,7 @@ use std::task::{Context, Poll, Waker};
/// .push_back(value);
///
/// // Notify the consumer a value is available
-/// self.notify.notify();
+/// self.notify.notify_one();
/// }
///
/// pub async fn recv(&self) -> T {
@@ -96,12 +105,20 @@ use std::task::{Context, Poll, Waker};
/// [park]: std::thread::park
/// [unpark]: std::thread::Thread::unpark
/// [`notified().await`]: Notify::notified()
-/// [`notify()`]: Notify::notify()
+/// [`notify_one()`]: Notify::notify_one()
/// [`Semaphore`]: crate::sync::Semaphore
#[derive(Debug)]
pub struct Notify {
state: AtomicU8,
- waiters: Mutex<LinkedList<Waiter>>,
+ waiters: Mutex<WaitList>,
+}
+
+#[derive(Debug, Clone, Copy)]
+enum NotificationType {
+ // Notification triggered by calling `notify_waiters`
+ AllWaiters,
+ // Notification triggered by calling `notify_one`
+ OneWaiter,
}
#[derive(Debug)]
@@ -113,7 +130,7 @@ struct Waiter {
waker: Option<Waker>,
/// `true` if the notification has been assigned to this waiter.
- notified: bool,
+ notified: Option<NotificationType>,
/// Should not be `Unpin`.
_p: PhantomPinned,
@@ -121,7 +138,7 @@ struct Waiter {
/// Future returned from `notified()`
#[derive(Debug)]
-struct Notified<'a> {
+pub struct Notified<'a> {
/// The `Notify` being received on.
notify: &'a Notify,
@@ -168,14 +185,38 @@ impl Notify {
}
}
+ /// Create a new `Notify`, initialized without a permit.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Notify;
+ ///
+ /// static NOTIFY: Notify = Notify::const_new();
+ /// ```
+ #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
+ pub const fn const_new() -> Notify {
+ Notify {
+ state: AtomicU8::new(0),
+ waiters: Mutex::const_new(LinkedList::new()),
+ }
+ }
+
/// Wait for a notification.
///
+ /// Equivalent to:
+ ///
+ /// ```ignore
+ /// async fn notified(&self);
+ /// ```
+ ///
/// Each `Notify` value holds a single permit. If a permit is available from
- /// an earlier call to [`notify()`], then `notified().await` will complete
+ /// an earlier call to [`notify_one()`], then `notified().await` will complete
/// immediately, consuming that permit. Otherwise, `notified().await` waits
- /// for a permit to be made available by the next call to `notify()`.
+ /// for a permit to be made available by the next call to `notify_one()`.
///
- /// [`notify()`]: Notify::notify
+ /// [`notify_one()`]: Notify::notify_one
///
/// # Examples
///
@@ -194,21 +235,20 @@ impl Notify {
/// });
///
/// println!("sending notification");
- /// notify.notify();
+ /// notify.notify_one();
/// }
/// ```
- pub async fn notified(&self) {
+ pub fn notified(&self) -> Notified<'_> {
Notified {
notify: self,
state: State::Init,
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
- notified: false,
+ notified: None,
_p: PhantomPinned,
}),
}
- .await
}
/// Notifies a waiting task
@@ -216,10 +256,10 @@ impl Notify {
/// If a task is currently waiting, that task is notified. Otherwise, a
/// permit is stored in this `Notify` value and the **next** call to
/// [`notified().await`] will complete immediately consuming the permit made
- /// available by this call to `notify()`.
+ /// available by this call to `notify_one()`.
///
/// At most one permit may be stored by `Notify`. Many sequential calls to
- /// `notify` will result in a single permit being stored. The next call to
+ /// `notify_one` will result in a single permit being stored. The next call to
/// `notified().await` will complete immediately, but the one after that
/// will wait.
///
@@ -242,10 +282,10 @@ impl Notify {
/// });
///
/// println!("sending notification");
- /// notify.notify();
+ /// notify.notify_one();
/// }
/// ```
- pub fn notify(&self) {
+ pub fn notify_one(&self) {
// Load the current state
let mut curr = self.state.load(SeqCst);
@@ -266,7 +306,7 @@ impl Notify {
}
// There are waiters, the lock must be acquired to notify.
- let mut waiters = self.waiters.lock().unwrap();
+ let mut waiters = self.waiters.lock();
// The state must be reloaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
@@ -277,6 +317,45 @@ impl Notify {
waker.wake();
}
}
+
+ /// Notifies all waiting tasks
+ pub(crate) fn notify_waiters(&self) {
+ // There are waiters, the lock must be acquired to notify.
+ let mut waiters = self.waiters.lock();
+
+ // The state must be reloaded while the lock is held. The state may only
+ // transition out of WAITING while the lock is held.
+ let curr = self.state.load(SeqCst);
+
+ if let EMPTY | NOTIFIED = curr {
+ // There are no waiting tasks. In this case, no synchronization is
+ // established between `notify` and `notified().await`.
+ return;
+ }
+
+ // At this point, it is guaranteed that the state will not
+ // concurrently change, as holding the lock is required to
+ // transition **out** of `WAITING`.
+ //
+ // Get pending waiters
+ while let Some(mut waiter) = waiters.pop_back() {
+ // Safety: `waiters` lock is still held.
+ let waiter = unsafe { waiter.as_mut() };
+
+ assert!(waiter.notified.is_none());
+
+ waiter.notified = Some(NotificationType::AllWaiters);
+
+ if let Some(waker) = waiter.waker.take() {
+ waker.wake();
+ }
+ }
+
+ // All waiters have been notified, the state must be transitioned to
+ // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be
+ // held, a `store` is sufficient.
+ self.state.store(EMPTY, SeqCst);
+ }
}
impl Default for Notify {
@@ -285,7 +364,7 @@ impl Default for Notify {
}
}
-fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) -> Option<Waker> {
+fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option<Waker> {
loop {
match curr {
EMPTY | NOTIFIED => {
@@ -311,9 +390,9 @@ fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) -
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };
- assert!(!waiter.notified);
+ assert!(waiter.notified.is_none());
- waiter.notified = true;
+ waiter.notified = Some(NotificationType::OneWaiter);
let waker = waiter.waker.take();
if waiters.is_empty() {
@@ -373,7 +452,7 @@ impl Future for Notified<'_> {
// Acquire the lock and attempt to transition to the waiting
// state.
- let mut waiters = notify.waiters.lock().unwrap();
+ let mut waiters = notify.waiters.lock();
// Reload the state with the lock held
let mut curr = notify.state.load(SeqCst);
@@ -428,6 +507,8 @@ impl Future for Notified<'_> {
waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
*state = Waiting;
+
+ return Poll::Pending;
}
Waiting => {
// Currently in the "Waiting" state, implying the caller has
@@ -435,16 +516,16 @@ impl Future for Notified<'_> {
// `notify.waiters`). In order to access the waker fields,
// we must hold the lock.
- let waiters = notify.waiters.lock().unwrap();
+ let waiters = notify.waiters.lock();
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };
- if w.notified {
+ if w.notified.is_some() {
// Our waker has been notified. Reset the fields and
// remove it from the list.
w.waker = None;
- w.notified = false;
+ w.notified = None;
*state = Done;
} else {
@@ -483,12 +564,12 @@ impl Drop for Notified<'_> {
// longer stored in the linked list.
if let Waiting = *state {
let mut notify_state = WAITING;
- let mut waiters = notify.waiters.lock().unwrap();
+ let mut waiters = notify.waiters.lock();
// `Notify.state` may be in any of the three states (Empty, Waiting,
// Notified). It doesn't actually matter what the atomic is set to
// at this point. We hold the lock and will ensure the atomic is in
- // the correct state once th elock is dropped.
+ // the correct state once the lock is dropped.
//
// Because the atomic state is not checked, at first glance, it may
// seem like this routine does not handle the case where the
@@ -516,14 +597,13 @@ impl Drop for Notified<'_> {
notify.state.store(EMPTY, SeqCst);
}
- // See if the node was notified but not received. In this case, the
- // notification must be sent to another waiter.
+ // See if the node was notified but not received. In this case, if
+ // the notification was triggered via `notify_one`, it must be sent
+ // to the next waiter.
//
// Safety: with the entry removed from the linked list, there can be
// no concurrent access to the entry
- let notified = unsafe { (*waiter.get()).notified };
-
- if notified {
+ if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
drop(waiters);
waker.wake();