aboutsummaryrefslogtreecommitdiff
path: root/src/util/idle_notified_set.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/idle_notified_set.rs')
-rw-r--r--src/util/idle_notified_set.rs462
1 files changed, 462 insertions, 0 deletions
diff --git a/src/util/idle_notified_set.rs b/src/util/idle_notified_set.rs
new file mode 100644
index 0000000..ce8ff9e
--- /dev/null
+++ b/src/util/idle_notified_set.rs
@@ -0,0 +1,462 @@
+//! This module defines an `IdleNotifiedSet`, which is a collection of elements.
+//! Each element is intended to correspond to a task, and the collection will
+//! keep track of which tasks have had their waker notified, and which have not.
+//!
+//! Each entry in the set holds some user-specified value. The value's type is
+//! specified using the `T` parameter. It will usually be a `JoinHandle` or
+//! similar.
+
+use std::marker::PhantomPinned;
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::task::{Context, Waker};
+
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::{Arc, Mutex};
+use crate::util::linked_list::{self, Link};
+use crate::util::{waker_ref, Wake};
+
+type LinkedList<T> =
+ linked_list::LinkedList<ListEntry<T>, <ListEntry<T> as linked_list::Link>::Target>;
+
+/// This is the main handle to the collection.
+pub(crate) struct IdleNotifiedSet<T> {
+ lists: Arc<Lists<T>>,
+ length: usize,
+}
+
+/// A handle to an entry that is guaranteed to be stored in the idle or notified
+/// list of its `IdleNotifiedSet`. This value borrows the `IdleNotifiedSet`
+/// mutably to prevent the entry from being moved to the `Neither` list, which
+/// only the `IdleNotifiedSet` may do.
+///
+/// The main consequence of being stored in one of the lists is that the `value`
+/// field has not yet been consumed.
+///
+/// Note: This entry can be moved from the idle to the notified list while this
+/// object exists by waking its waker.
+pub(crate) struct EntryInOneOfTheLists<'a, T> {
+ entry: Arc<ListEntry<T>>,
+ set: &'a mut IdleNotifiedSet<T>,
+}
+
+type Lists<T> = Mutex<ListsInner<T>>;
+
+/// The linked lists hold strong references to the ListEntry items, and the
+/// ListEntry items also hold a strong reference back to the Lists object, but
+/// the destructor of the `IdleNotifiedSet` will clear the two lists, so once
+/// that object is destroyed, no ref-cycles will remain.
+struct ListsInner<T> {
+ notified: LinkedList<T>,
+ idle: LinkedList<T>,
+ /// Whenever an element in the `notified` list is woken, this waker will be
+ /// notified and consumed, if it exists.
+ waker: Option<Waker>,
+}
+
+/// Which of the two lists in the shared Lists object is this entry stored in?
+///
+/// If the value is `Idle`, then an entry's waker may move it to the notified
+/// list. Otherwise, only the `IdleNotifiedSet` may move it.
+///
+/// If the value is `Neither`, then it is still possible that the entry is in
+/// some third external list (this happens in `drain`).
+#[derive(Copy, Clone, Eq, PartialEq)]
+enum List {
+ Notified,
+ Idle,
+ Neither,
+}
+
+/// An entry in the list.
+///
+/// # Safety
+///
+/// The `my_list` field must only be accessed while holding the mutex in
+/// `parent`. It is an invariant that the value of `my_list` corresponds to
+/// which linked list in the `parent` holds this entry. Once this field takes
+/// the value `Neither`, then it may never be modified again.
+///
+/// If the value of `my_list` is `Notified` or `Idle`, then the `pointers` field
+/// must only be accessed while holding the mutex. If the value of `my_list` is
+/// `Neither`, then the `pointers` field may be accessed by the
+/// `IdleNotifiedSet` (this happens inside `drain`).
+///
+/// The `value` field is owned by the `IdleNotifiedSet` and may only be accessed
+/// by the `IdleNotifiedSet`. The operation that sets the value of `my_list` to
+/// `Neither` assumes ownership of the `value`, and it must either drop it or
+/// move it out from this entry to prevent it from getting leaked. (Since the
+/// two linked lists are emptied in the destructor of `IdleNotifiedSet`, the
+/// value should not be leaked.)
+struct ListEntry<T> {
+ /// The linked list pointers of the list this entry is in.
+ pointers: linked_list::Pointers<ListEntry<T>>,
+ /// Pointer to the shared `Lists` struct.
+ parent: Arc<Lists<T>>,
+ /// The value stored in this entry.
+ value: UnsafeCell<ManuallyDrop<T>>,
+ /// Used to remember which list this entry is in.
+ my_list: UnsafeCell<List>,
+ /// Required by the `linked_list::Pointers` field.
+ _pin: PhantomPinned,
+}
+
+generate_addr_of_methods! {
+ impl<T> ListEntry<T> {
+ unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
+ &self.pointers
+ }
+ }
+}
+
+// With mutable access to the `IdleNotifiedSet`, you can get mutable access to
+// the values.
+unsafe impl<T: Send> Send for IdleNotifiedSet<T> {}
+// With the current API we strictly speaking don't even need `T: Sync`, but we
+// require it anyway to support adding &self APIs that access the values in the
+// future.
+unsafe impl<T: Sync> Sync for IdleNotifiedSet<T> {}
+
+// These impls control when it is safe to create a Waker. Since the waker does
+// not allow access to the value in any way (including its destructor), it is
+// not necessary for `T` to be Send or Sync.
+unsafe impl<T> Send for ListEntry<T> {}
+unsafe impl<T> Sync for ListEntry<T> {}
+
+impl<T> IdleNotifiedSet<T> {
+ /// Create a new IdleNotifiedSet.
+ pub(crate) fn new() -> Self {
+ let lists = Mutex::new(ListsInner {
+ notified: LinkedList::new(),
+ idle: LinkedList::new(),
+ waker: None,
+ });
+
+ IdleNotifiedSet {
+ lists: Arc::new(lists),
+ length: 0,
+ }
+ }
+
+ pub(crate) fn len(&self) -> usize {
+ self.length
+ }
+
+ pub(crate) fn is_empty(&self) -> bool {
+ self.length == 0
+ }
+
+ /// Insert the given value into the `idle` list.
+ pub(crate) fn insert_idle(&mut self, value: T) -> EntryInOneOfTheLists<'_, T> {
+ self.length += 1;
+
+ let entry = Arc::new(ListEntry {
+ parent: self.lists.clone(),
+ value: UnsafeCell::new(ManuallyDrop::new(value)),
+ my_list: UnsafeCell::new(List::Idle),
+ pointers: linked_list::Pointers::new(),
+ _pin: PhantomPinned,
+ });
+
+ {
+ let mut lock = self.lists.lock();
+ lock.idle.push_front(entry.clone());
+ }
+
+ // Safety: We just put the entry in the idle list, so it is in one of the lists.
+ EntryInOneOfTheLists { entry, set: self }
+ }
+
+ /// Pop an entry from the notified list to poll it. The entry is moved to
+ /// the idle list atomically.
+ pub(crate) fn pop_notified(&mut self, waker: &Waker) -> Option<EntryInOneOfTheLists<'_, T>> {
+ // We don't decrement the length because this call moves the entry to
+ // the idle list rather than removing it.
+ if self.length == 0 {
+ // Fast path.
+ return None;
+ }
+
+ let mut lock = self.lists.lock();
+
+ let should_update_waker = match lock.waker.as_mut() {
+ Some(cur_waker) => !waker.will_wake(cur_waker),
+ None => true,
+ };
+ if should_update_waker {
+ lock.waker = Some(waker.clone());
+ }
+
+ // Pop the entry, returning None if empty.
+ let entry = lock.notified.pop_back()?;
+
+ lock.idle.push_front(entry.clone());
+
+ // Safety: We are holding the lock.
+ entry.my_list.with_mut(|ptr| unsafe {
+ *ptr = List::Idle;
+ });
+
+ drop(lock);
+
+ // Safety: We just put the entry in the idle list, so it is in one of the lists.
+ Some(EntryInOneOfTheLists { entry, set: self })
+ }
+
+ /// Call a function on every element in this list.
+ pub(crate) fn for_each<F: FnMut(&mut T)>(&mut self, mut func: F) {
+ fn get_ptrs<T>(list: &mut LinkedList<T>, ptrs: &mut Vec<*mut T>) {
+ let mut node = list.last();
+
+ while let Some(entry) = node {
+ ptrs.push(entry.value.with_mut(|ptr| {
+ let ptr: *mut ManuallyDrop<T> = ptr;
+ let ptr: *mut T = ptr.cast();
+ ptr
+ }));
+
+ let prev = entry.pointers.get_prev();
+ node = prev.map(|prev| unsafe { &*prev.as_ptr() });
+ }
+ }
+
+ // Atomically get a raw pointer to the value of every entry.
+ //
+ // Since this only locks the mutex once, it is not possible for a value
+ // to get moved from the idle list to the notified list during the
+ // operation, which would otherwise result in some value being listed
+ // twice.
+ let mut ptrs = Vec::with_capacity(self.len());
+ {
+ let mut lock = self.lists.lock();
+
+ get_ptrs(&mut lock.idle, &mut ptrs);
+ get_ptrs(&mut lock.notified, &mut ptrs);
+ }
+ debug_assert_eq!(ptrs.len(), ptrs.capacity());
+
+ for ptr in ptrs {
+ // Safety: When we grabbed the pointers, the entries were in one of
+ // the two lists. This means that their value was valid at the time,
+ // and it must still be valid because we are the IdleNotifiedSet,
+ // and only we can remove an entry from the two lists. (It's
+ // possible that an entry is moved from one list to the other during
+ // this loop, but that is ok.)
+ func(unsafe { &mut *ptr });
+ }
+ }
+
+ /// Remove all entries in both lists, applying some function to each element.
+ ///
+ /// The closure is called on all elements even if it panics. Having it panic
+ /// twice is a double-panic, and will abort the application.
+ pub(crate) fn drain<F: FnMut(T)>(&mut self, func: F) {
+ if self.length == 0 {
+ // Fast path.
+ return;
+ }
+ self.length = 0;
+
+ // The LinkedList is not cleared on panic, so we use a bomb to clear it.
+ //
+ // This value has the invariant that any entry in its `all_entries` list
+ // has `my_list` set to `Neither` and that the value has not yet been
+ // dropped.
+ struct AllEntries<T, F: FnMut(T)> {
+ all_entries: LinkedList<T>,
+ func: F,
+ }
+
+ impl<T, F: FnMut(T)> AllEntries<T, F> {
+ fn pop_next(&mut self) -> bool {
+ if let Some(entry) = self.all_entries.pop_back() {
+ // Safety: We just took this value from the list, so we can
+ // destroy the value in the entry.
+ entry
+ .value
+ .with_mut(|ptr| unsafe { (self.func)(ManuallyDrop::take(&mut *ptr)) });
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ impl<T, F: FnMut(T)> Drop for AllEntries<T, F> {
+ fn drop(&mut self) {
+ while self.pop_next() {}
+ }
+ }
+
+ let mut all_entries = AllEntries {
+ all_entries: LinkedList::new(),
+ func,
+ };
+
+ // Atomically move all entries to the new linked list in the AllEntries
+ // object.
+ {
+ let mut lock = self.lists.lock();
+ unsafe {
+ // Safety: We are holding the lock and `all_entries` is a new
+ // LinkedList.
+ move_to_new_list(&mut lock.idle, &mut all_entries.all_entries);
+ move_to_new_list(&mut lock.notified, &mut all_entries.all_entries);
+ }
+ }
+
+ // Keep destroying entries in the list until it is empty.
+ //
+ // If the closure panics, then the destructor of the `AllEntries` bomb
+ // ensures that we keep running the destructor on the remaining values.
+ // A second panic will abort the program.
+ while all_entries.pop_next() {}
+ }
+}
+
+/// # Safety
+///
+/// The mutex for the entries must be held, and the target list must be such
+/// that setting `my_list` to `Neither` is ok.
+unsafe fn move_to_new_list<T>(from: &mut LinkedList<T>, to: &mut LinkedList<T>) {
+ while let Some(entry) = from.pop_back() {
+ entry.my_list.with_mut(|ptr| {
+ *ptr = List::Neither;
+ });
+ to.push_front(entry);
+ }
+}
+
+impl<'a, T> EntryInOneOfTheLists<'a, T> {
+ /// Remove this entry from the list it is in, returning the value associated
+ /// with the entry.
+ ///
+ /// This consumes the value, since it is no longer guaranteed to be in a
+ /// list.
+ pub(crate) fn remove(self) -> T {
+ self.set.length -= 1;
+
+ {
+ let mut lock = self.set.lists.lock();
+
+ // Safety: We are holding the lock so there is no race, and we will
+ // remove the entry afterwards to uphold invariants.
+ let old_my_list = self.entry.my_list.with_mut(|ptr| unsafe {
+ let old_my_list = *ptr;
+ *ptr = List::Neither;
+ old_my_list
+ });
+
+ let list = match old_my_list {
+ List::Idle => &mut lock.idle,
+ List::Notified => &mut lock.notified,
+ // An entry in one of the lists is in one of the lists.
+ List::Neither => unreachable!(),
+ };
+
+ unsafe {
+ // Safety: We just checked that the entry is in this particular
+ // list.
+ list.remove(ListEntry::as_raw(&self.entry)).unwrap();
+ }
+ }
+
+ // By setting `my_list` to `Neither`, we have taken ownership of the
+ // value. We return it to the caller.
+ //
+ // Safety: We have a mutable reference to the `IdleNotifiedSet` that
+ // owns this entry, so we can use its permission to access the value.
+ self.entry
+ .value
+ .with_mut(|ptr| unsafe { ManuallyDrop::take(&mut *ptr) })
+ }
+
+ /// Access the value in this entry together with a context for its waker.
+ pub(crate) fn with_value_and_context<F, U>(&mut self, func: F) -> U
+ where
+ F: FnOnce(&mut T, &mut Context<'_>) -> U,
+ T: 'static,
+ {
+ let waker = waker_ref(&self.entry);
+
+ let mut context = Context::from_waker(&waker);
+
+ // Safety: We have a mutable reference to the `IdleNotifiedSet` that
+ // owns this entry, so we can use its permission to access the value.
+ self.entry
+ .value
+ .with_mut(|ptr| unsafe { func(&mut *ptr, &mut context) })
+ }
+}
+
+impl<T> Drop for IdleNotifiedSet<T> {
+ fn drop(&mut self) {
+ // Clear both lists.
+ self.drain(drop);
+
+ #[cfg(debug_assertions)]
+ if !std::thread::panicking() {
+ let lock = self.lists.lock();
+ assert!(lock.idle.is_empty());
+ assert!(lock.notified.is_empty());
+ }
+ }
+}
+
+impl<T: 'static> Wake for ListEntry<T> {
+ fn wake_by_ref(me: &Arc<Self>) {
+ let mut lock = me.parent.lock();
+
+ // Safety: We are holding the lock and we will update the lists to
+ // maintain invariants.
+ let old_my_list = me.my_list.with_mut(|ptr| unsafe {
+ let old_my_list = *ptr;
+ if old_my_list == List::Idle {
+ *ptr = List::Notified;
+ }
+ old_my_list
+ });
+
+ if old_my_list == List::Idle {
+ // We move ourself to the notified list.
+ let me = unsafe {
+ // Safety: We just checked that we are in this particular list.
+ lock.idle.remove(NonNull::from(&**me)).unwrap()
+ };
+ lock.notified.push_front(me);
+
+ if let Some(waker) = lock.waker.take() {
+ drop(lock);
+ waker.wake();
+ }
+ }
+ }
+
+ fn wake(me: Arc<Self>) {
+ Self::wake_by_ref(&me)
+ }
+}
+
+/// # Safety
+///
+/// `ListEntry` is forced to be !Unpin.
+unsafe impl<T> linked_list::Link for ListEntry<T> {
+ type Handle = Arc<ListEntry<T>>;
+ type Target = ListEntry<T>;
+
+ fn as_raw(handle: &Self::Handle) -> NonNull<ListEntry<T>> {
+ let ptr: *const ListEntry<T> = Arc::as_ptr(handle);
+ // Safety: We can't get a null pointer from `Arc::as_ptr`.
+ unsafe { NonNull::new_unchecked(ptr as *mut ListEntry<T>) }
+ }
+
+ unsafe fn from_raw(ptr: NonNull<ListEntry<T>>) -> Arc<ListEntry<T>> {
+ Arc::from_raw(ptr.as_ptr())
+ }
+
+ unsafe fn pointers(
+ target: NonNull<ListEntry<T>>,
+ ) -> NonNull<linked_list::Pointers<ListEntry<T>>> {
+ ListEntry::addr_of_pointers(target)
+ }
+}