path: root/src/debt/list.rs
diff options
Diffstat (limited to 'src/debt/list.rs')
1 files changed, 346 insertions, 0 deletions
diff --git a/src/debt/list.rs b/src/debt/list.rs
new file mode 100644
index 0000000..3d17388
--- /dev/null
+++ b/src/debt/list.rs
@@ -0,0 +1,346 @@
+//! A linked list of debt nodes.
+//! A node may or may not be owned by a thread. Reader debts are allocated in its owned node,
+//! writer walks everything (but may also use some owned values).
+//! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another
+//! thread later on). This makes the implementation much simpler, since everything here is
+//! `'static` and we don't have to care about knowing when to free stuff.
+//! The nodes contain both the fast primary slots and a secondary fallback ones.
+//! # Synchronization
+//! We synchronize several things here.
+//! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each
+//! attempt to add another node). Note that certain parts never change after that (they aren't even
+//! atomic) and other things that do change take care of themselves (the debt slots have their own
+//! synchronization, etc).
+//! The ownership is acquire-release lock pattern.
+//! Similar, the counting of active writers is an acquire-release lock pattern.
+//! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see
+//! at least as up to date value of the writers as when the cooldown started. That we if we see 0,
+//! we know it must have happened since then.
+use std::cell::Cell;
+use std::ptr;
+use std::slice::Iter;
+use std::sync::atomic::Ordering::*;
+use std::sync::atomic::{AtomicPtr, AtomicUsize};
+use super::fast::{Local as FastLocal, Slots as FastSlots};
+use super::helping::{Local as HelpingLocal, Slots as HelpingSlots};
+use super::Debt;
+use crate::RefCnt;
+const NODE_UNUSED: usize = 0;
+const NODE_USED: usize = 1;
+const NODE_COOLDOWN: usize = 2;
+/// The head of the debt linked list.
+static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());
+pub struct NodeReservation<'a>(&'a Node);
+impl Drop for NodeReservation<'_> {
+ fn drop(&mut self) {
+ self.0.active_writers.fetch_sub(1, Release);
+ }
+/// One thread-local node for debts.
+#[repr(C, align(64))]
+pub(crate) struct Node {
+ fast: FastSlots,
+ helping: HelpingSlots,
+ in_use: AtomicUsize,
+ // Next node in the list.
+ //
+ // It is a pointer because we touch it before synchronization (we don't _dereference_ it before
+ // synchronization, only manipulate the pointer itself). That is illegal according to strict
+ // interpretation of the rules by MIRI on references.
+ next: *const Node,
+ active_writers: AtomicUsize,
+impl Default for Node {
+ fn default() -> Self {
+ Node {
+ fast: FastSlots::default(),
+ helping: HelpingSlots::default(),
+ in_use: AtomicUsize::new(NODE_USED),
+ next: ptr::null(),
+ active_writers: AtomicUsize::new(0),
+ }
+ }
+impl Node {
+ /// Goes through the debt linked list.
+ ///
+ /// This traverses the linked list, calling the closure on each node. If the closure returns
+ /// `Some`, it terminates with that value early, otherwise it runs to the end.
+ pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> {
+ // Acquire ‒ we want to make sure we read the correct version of data at the end of the
+ // pointer. Any write to the DEBT_HEAD is with Release.
+ //
+ // Furthermore, we need to see the newest version of the list in case we examine the debts
+ // - if a new one is added recently, we don't want a stale read -> SeqCst.
+ //
+ // Note that the other pointers in the chain never change and are *ordinary* pointers. The
+ // whole linked list is synchronized through the head.
+ let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() };
+ while let Some(node) = current {
+ let result = f(node);
+ if result.is_some() {
+ return result;
+ }
+ current = unsafe { node.next.as_ref() };
+ }
+ None
+ }
+ /// Put the current thread node into cooldown
+ fn start_cooldown(&self) {
+ // Trick: Make sure we have an up to date value of the active_writers in this thread, so we
+ // can properly release it below.
+ let _reservation = self.reserve_writer();
+ assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release));
+ }
+ /// Perform a cooldown if the node is ready.
+ ///
+ /// See the ABA protection at the [helping].
+ fn check_cooldown(&self) {
+ // Check if the node is in cooldown, for two reasons:
+ // * Skip most of nodes fast, without dealing with them.
+ // * More importantly, sync the value of active_writers to be at least the value when the
+ // cooldown started. That way we know the 0 we observe happened some time after
+ // start_cooldown.
+ if self.in_use.load(Acquire) == NODE_COOLDOWN {
+ // The rest can be nicely relaxed ‒ no memory is being synchronized by these
+ // operations. We just see an up to date 0 and allow someone (possibly us) to claim the
+ // node later on.
+ if self.active_writers.load(Relaxed) == 0 {
+ let _ = self
+ .in_use
+ .compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed);
+ }
+ }
+ }
+ /// Mark this node that a writer is currently playing with it.
+ pub fn reserve_writer(&self) -> NodeReservation {
+ self.active_writers.fetch_add(1, Acquire);
+ NodeReservation(self)
+ }
+ /// "Allocate" a node.
+ ///
+ /// Either a new one is created, or previous one is reused. The node is claimed to become
+ /// in_use.
+ fn get() -> &'static Self {
+ // Try to find an unused one in the chain and reuse it.
+ Self::traverse(|node| {
+ node.check_cooldown();
+ if node
+ .in_use
+ // We claim a unique control over the generation and the right to write to slots if
+ // they are NO_DEPT
+ .compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed)
+ .is_ok()
+ {
+ Some(node)
+ } else {
+ None
+ }
+ })
+ // If that didn't work, create a new one and prepend to the list.
+ .unwrap_or_else(|| {
+ let node = Box::leak(Box::<Node>::default());
+ node.helping.init();
+ // We don't want to read any data in addition to the head, Relaxed is fine
+ // here.
+ //
+ // We do need to release the data to others, but for that, we acquire in the
+ // compare_exchange below.
+ let mut head = LIST_HEAD.load(Relaxed);
+ loop {
+ node.next = head;
+ if let Err(old) = LIST_HEAD.compare_exchange_weak(
+ head, node,
+ // We need to release *the whole chain* here. For that, we need to
+ // acquire it first.
+ //
+ // SeqCst because we need to make sure it is properly set "before" we do
+ // anything to the debts.
+ SeqCst, Relaxed, // Nothing changed, go next round of the loop.
+ ) {
+ head = old;
+ } else {
+ return node;
+ }
+ }
+ })
+ }
+ /// Iterate over the fast slots.
+ pub(crate) fn fast_slots(&self) -> Iter<Debt> {
+ self.fast.into_iter()
+ }
+ /// Access the helping slot.
+ pub(crate) fn helping_slot(&self) -> &Debt {
+ self.helping.slot()
+ }
+/// A wrapper around a node pointer, to un-claim the node on thread shutdown.
+pub(crate) struct LocalNode {
+ /// Node for this thread, if any.
+ ///
+ /// We don't necessarily have to own one, but if we don't, we'll get one before the first use.
+ node: Cell<Option<&'static Node>>,
+ /// Thread-local data for the fast slots.
+ fast: FastLocal,
+ /// Thread local data for the helping strategy.
+ helping: HelpingLocal,
+impl LocalNode {
+ pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R {
+ let f = Cell::new(Some(f));
+ .try_with(|head| {
+ if head.node.get().is_none() {
+ head.node.set(Some(Node::get()));
+ }
+ let f = f.take().unwrap();
+ f(head)
+ })
+ // During the application shutdown, the thread local storage may be already
+ // deallocated. In that case, the above fails but we still need something. So we just
+ // find or allocate a node and use it just once.
+ //
+ // Note that the situation should be very very rare and not happen often, so the slower
+ // performance doesn't matter that much.
+ .unwrap_or_else(|_| {
+ let tmp_node = LocalNode {
+ node: Cell::new(Some(Node::get())),
+ fast: FastLocal::default(),
+ helping: HelpingLocal::default(),
+ };
+ let f = f.take().unwrap();
+ f(&tmp_node)
+ // Drop of tmp_node -> sends the node we just used into cooldown.
+ })
+ }
+ /// Creates a new debt.
+ ///
+ /// This stores the debt of the given pointer (untyped, casted into an usize) and returns a
+ /// reference to that slot, or gives up with `None` if all the slots are currently full.
+ #[inline]
+ pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> {
+ let node = &self.node.get().expect("LocalNode::with ensures it is set");
+ debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
+ node.fast.get_debt(ptr, &self.fast)
+ }
+ /// Initializes a helping slot transaction.
+ ///
+ /// Returns the generation (with tag).
+ pub(crate) fn new_helping(&self, ptr: usize) -> usize {
+ let node = &self.node.get().expect("LocalNode::with ensures it is set");
+ debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
+ let (gen, discard) = node.helping.get_debt(ptr, &self.helping);
+ if discard {
+ // Too many generations happened, make sure the writers give the poor node a break for
+ // a while so they don't observe the generation wrapping around.
+ node.start_cooldown();
+ self.node.take();
+ }
+ gen
+ }
+ /// Confirm the helping transaction.
+ ///
+ /// The generation comes from previous new_helping.
+ ///
+ /// Will either return a debt with the pointer, or a debt to pay and a replacement (already
+ /// protected) address.
+ pub(crate) fn confirm_helping(
+ &self,
+ gen: usize,
+ ptr: usize,
+ ) -> Result<&'static Debt, (&'static Debt, usize)> {
+ let node = &self.node.get().expect("LocalNode::with ensures it is set");
+ debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
+ let slot = node.helping_slot();
+ node.helping
+ .confirm(gen, ptr)
+ .map(|()| slot)
+ .map_err(|repl| (slot, repl))
+ }
+ /// The writer side of a helping slot.
+ ///
+ /// This potentially helps the `who` node (uses self as the local node, which must be
+ /// different) by loading the address that one is trying to load.
+ pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R)
+ where
+ T: RefCnt,
+ R: Fn() -> T,
+ {
+ let node = &self.node.get().expect("LocalNode::with ensures it is set");
+ debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
+ node.helping.help(&who.helping, storage_addr, replacement)
+ }
+impl Drop for LocalNode {
+ fn drop(&mut self) {
+ if let Some(node) = self.node.get() {
+ // Release - syncing writes/ownership of this Node
+ node.start_cooldown();
+ }
+ }
+thread_local! {
+ /// A debt node assigned to this thread.
+ static THREAD_HEAD: LocalNode = LocalNode {
+ node: Cell::new(None),
+ fast: FastLocal::default(),
+ helping: HelpingLocal::default(),
+ };
+mod tests {
+ use super::*;
+ impl Node {
+ fn is_empty(&self) -> bool {
+ self.fast_slots()
+ .chain(std::iter::once(self.helping_slot()))
+ .all(|d| d.0.load(Relaxed) == Debt::NONE)
+ }
+ fn get_thread() -> &'static Self {
+ LocalNode::with(|h| h.node.get().unwrap())
+ }
+ }
+ /// A freshly acquired thread local node is empty.
+ #[test]
+ fn new_empty() {
+ assert!(Node::get_thread().is_empty());
+ }