| //! A linked list of debt nodes. |
| //! |
| //! A node may or may not be owned by a thread. Reader debts are allocated in its owned node, |
| //! writer walks everything (but may also use some owned values). |
| //! |
| //! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another |
| //! thread later on). This makes the implementation much simpler, since everything here is |
| //! `'static` and we don't have to care about knowing when to free stuff. |
| //! |
| //! The nodes contain both the fast primary slots and a secondary fallback ones. |
| //! |
| //! # Synchronization |
| //! |
| //! We synchronize several things here. |
| //! |
| //! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each |
| //! attempt to add another node). Note that certain parts never change after that (they aren't even |
| //! atomic) and other things that do change take care of themselves (the debt slots have their own |
| //! synchronization, etc). |
| //! |
| //! The ownership is acquire-release lock pattern. |
| //! |
| //! Similar, the counting of active writers is an acquire-release lock pattern. |
| //! |
| //! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see |
| //! at least as up to date value of the writers as when the cooldown started. That we if we see 0, |
| //! we know it must have happened since then. |
| |
| use core::cell::Cell; |
| use core::ptr; |
| use core::slice::Iter; |
| use core::sync::atomic::Ordering::*; |
| use core::sync::atomic::{AtomicPtr, AtomicUsize}; |
| |
| #[cfg(feature = "experimental-thread-local")] |
| use core::cell::OnceCell; |
| |
| use alloc::boxed::Box; |
| |
| use super::fast::{Local as FastLocal, Slots as FastSlots}; |
| use super::helping::{Local as HelpingLocal, Slots as HelpingSlots}; |
| use super::Debt; |
| use crate::RefCnt; |
| |
| const NODE_UNUSED: usize = 0; |
| const NODE_USED: usize = 1; |
| const NODE_COOLDOWN: usize = 2; |
| |
| /// The head of the debt linked list. |
| static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut()); |
| |
| pub struct NodeReservation<'a>(&'a Node); |
| |
| impl Drop for NodeReservation<'_> { |
| fn drop(&mut self) { |
| self.0.active_writers.fetch_sub(1, Release); |
| } |
| } |
| |
| /// One thread-local node for debts. |
| #[repr(C, align(64))] |
| pub(crate) struct Node { |
| fast: FastSlots, |
| helping: HelpingSlots, |
| in_use: AtomicUsize, |
| // Next node in the list. |
| // |
| // It is a pointer because we touch it before synchronization (we don't _dereference_ it before |
| // synchronization, only manipulate the pointer itself). That is illegal according to strict |
| // interpretation of the rules by MIRI on references. |
| next: *const Node, |
| active_writers: AtomicUsize, |
| } |
| |
| impl Default for Node { |
| fn default() -> Self { |
| Node { |
| fast: FastSlots::default(), |
| helping: HelpingSlots::default(), |
| in_use: AtomicUsize::new(NODE_USED), |
| next: ptr::null(), |
| active_writers: AtomicUsize::new(0), |
| } |
| } |
| } |
| |
| impl Node { |
| /// Goes through the debt linked list. |
| /// |
| /// This traverses the linked list, calling the closure on each node. If the closure returns |
| /// `Some`, it terminates with that value early, otherwise it runs to the end. |
| pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> { |
| // Acquire ‒ we want to make sure we read the correct version of data at the end of the |
| // pointer. Any write to the DEBT_HEAD is with Release. |
| // |
| // Furthermore, we need to see the newest version of the list in case we examine the debts |
| // - if a new one is added recently, we don't want a stale read -> SeqCst. |
| // |
| // Note that the other pointers in the chain never change and are *ordinary* pointers. The |
| // whole linked list is synchronized through the head. |
| let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() }; |
| while let Some(node) = current { |
| let result = f(node); |
| if result.is_some() { |
| return result; |
| } |
| current = unsafe { node.next.as_ref() }; |
| } |
| None |
| } |
| |
| /// Put the current thread node into cooldown |
| fn start_cooldown(&self) { |
| // Trick: Make sure we have an up to date value of the active_writers in this thread, so we |
| // can properly release it below. |
| let _reservation = self.reserve_writer(); |
| assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release)); |
| } |
| |
| /// Perform a cooldown if the node is ready. |
| /// |
| /// See the ABA protection at the [helping]. |
| fn check_cooldown(&self) { |
| // Check if the node is in cooldown, for two reasons: |
| // * Skip most of nodes fast, without dealing with them. |
| // * More importantly, sync the value of active_writers to be at least the value when the |
| // cooldown started. That way we know the 0 we observe happened some time after |
| // start_cooldown. |
| if self.in_use.load(Acquire) == NODE_COOLDOWN { |
| // The rest can be nicely relaxed ‒ no memory is being synchronized by these |
| // operations. We just see an up to date 0 and allow someone (possibly us) to claim the |
| // node later on. |
| if self.active_writers.load(Relaxed) == 0 { |
| let _ = self |
| .in_use |
| .compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed); |
| } |
| } |
| } |
| |
| /// Mark this node that a writer is currently playing with it. |
| pub fn reserve_writer(&self) -> NodeReservation { |
| self.active_writers.fetch_add(1, Acquire); |
| NodeReservation(self) |
| } |
| |
| /// "Allocate" a node. |
| /// |
| /// Either a new one is created, or previous one is reused. The node is claimed to become |
| /// in_use. |
| fn get() -> &'static Self { |
| // Try to find an unused one in the chain and reuse it. |
| Self::traverse(|node| { |
| node.check_cooldown(); |
| if node |
| .in_use |
| // We claim a unique control over the generation and the right to write to slots if |
| // they are NO_DEPT |
| .compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed) |
| .is_ok() |
| { |
| Some(node) |
| } else { |
| None |
| } |
| }) |
| // If that didn't work, create a new one and prepend to the list. |
| .unwrap_or_else(|| { |
| let node = Box::leak(Box::<Node>::default()); |
| node.helping.init(); |
| // We don't want to read any data in addition to the head, Relaxed is fine |
| // here. |
| // |
| // We do need to release the data to others, but for that, we acquire in the |
| // compare_exchange below. |
| let mut head = LIST_HEAD.load(Relaxed); |
| loop { |
| node.next = head; |
| if let Err(old) = LIST_HEAD.compare_exchange_weak( |
| head, node, |
| // We need to release *the whole chain* here. For that, we need to |
| // acquire it first. |
| // |
| // SeqCst because we need to make sure it is properly set "before" we do |
| // anything to the debts. |
| SeqCst, Relaxed, // Nothing changed, go next round of the loop. |
| ) { |
| head = old; |
| } else { |
| return node; |
| } |
| } |
| }) |
| } |
| |
| /// Iterate over the fast slots. |
| pub(crate) fn fast_slots(&self) -> Iter<Debt> { |
| self.fast.into_iter() |
| } |
| |
| /// Access the helping slot. |
| pub(crate) fn helping_slot(&self) -> &Debt { |
| self.helping.slot() |
| } |
| } |
| |
| /// A wrapper around a node pointer, to un-claim the node on thread shutdown. |
| pub(crate) struct LocalNode { |
| /// Node for this thread, if any. |
| /// |
| /// We don't necessarily have to own one, but if we don't, we'll get one before the first use. |
| node: Cell<Option<&'static Node>>, |
| |
| /// Thread-local data for the fast slots. |
| fast: FastLocal, |
| |
| /// Thread local data for the helping strategy. |
| helping: HelpingLocal, |
| } |
| |
| impl LocalNode { |
| #[cfg(not(feature = "experimental-thread-local"))] |
| pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R { |
| let f = Cell::new(Some(f)); |
| THREAD_HEAD |
| .try_with(|head| { |
| if head.node.get().is_none() { |
| head.node.set(Some(Node::get())); |
| } |
| let f = f.take().unwrap(); |
| f(head) |
| }) |
| // During the application shutdown, the thread local storage may be already |
| // deallocated. In that case, the above fails but we still need something. So we just |
| // find or allocate a node and use it just once. |
| // |
| // Note that the situation should be very very rare and not happen often, so the slower |
| // performance doesn't matter that much. |
| .unwrap_or_else(|_| { |
| let tmp_node = LocalNode { |
| node: Cell::new(Some(Node::get())), |
| fast: FastLocal::default(), |
| helping: HelpingLocal::default(), |
| }; |
| let f = f.take().unwrap(); |
| f(&tmp_node) |
| // Drop of tmp_node -> sends the node we just used into cooldown. |
| }) |
| } |
| |
| #[cfg(feature = "experimental-thread-local")] |
| pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R { |
| let thread_head = THREAD_HEAD.get_or_init(|| LocalNode { |
| node: Cell::new(None), |
| fast: FastLocal::default(), |
| helping: HelpingLocal::default(), |
| }); |
| if thread_head.node.get().is_none() { |
| thread_head.node.set(Some(Node::get())); |
| } |
| f(&thread_head) |
| } |
| |
| /// Creates a new debt. |
| /// |
| /// This stores the debt of the given pointer (untyped, casted into an usize) and returns a |
| /// reference to that slot, or gives up with `None` if all the slots are currently full. |
| #[inline] |
| pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> { |
| let node = &self.node.get().expect("LocalNode::with ensures it is set"); |
| debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); |
| node.fast.get_debt(ptr, &self.fast) |
| } |
| |
| /// Initializes a helping slot transaction. |
| /// |
| /// Returns the generation (with tag). |
| pub(crate) fn new_helping(&self, ptr: usize) -> usize { |
| let node = &self.node.get().expect("LocalNode::with ensures it is set"); |
| debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); |
| let (gen, discard) = node.helping.get_debt(ptr, &self.helping); |
| if discard { |
| // Too many generations happened, make sure the writers give the poor node a break for |
| // a while so they don't observe the generation wrapping around. |
| node.start_cooldown(); |
| self.node.take(); |
| } |
| gen |
| } |
| |
| /// Confirm the helping transaction. |
| /// |
| /// The generation comes from previous new_helping. |
| /// |
| /// Will either return a debt with the pointer, or a debt to pay and a replacement (already |
| /// protected) address. |
| pub(crate) fn confirm_helping( |
| &self, |
| gen: usize, |
| ptr: usize, |
| ) -> Result<&'static Debt, (&'static Debt, usize)> { |
| let node = &self.node.get().expect("LocalNode::with ensures it is set"); |
| debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); |
| let slot = node.helping_slot(); |
| node.helping |
| .confirm(gen, ptr) |
| .map(|()| slot) |
| .map_err(|repl| (slot, repl)) |
| } |
| |
| /// The writer side of a helping slot. |
| /// |
| /// This potentially helps the `who` node (uses self as the local node, which must be |
| /// different) by loading the address that one is trying to load. |
| pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R) |
| where |
| T: RefCnt, |
| R: Fn() -> T, |
| { |
| let node = &self.node.get().expect("LocalNode::with ensures it is set"); |
| debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); |
| node.helping.help(&who.helping, storage_addr, replacement) |
| } |
| } |
| |
| impl Drop for LocalNode { |
| fn drop(&mut self) { |
| if let Some(node) = self.node.get() { |
| // Release - syncing writes/ownership of this Node |
| node.start_cooldown(); |
| } |
| } |
| } |
| |
| #[cfg(not(feature = "experimental-thread-local"))] |
| thread_local! { |
| /// A debt node assigned to this thread. |
| static THREAD_HEAD: LocalNode = LocalNode { |
| node: Cell::new(None), |
| fast: FastLocal::default(), |
| helping: HelpingLocal::default(), |
| }; |
| } |
| |
| #[cfg(feature = "experimental-thread-local")] |
| #[thread_local] |
| /// A debt node assigned to this thread. |
| static THREAD_HEAD: OnceCell<LocalNode> = OnceCell::new(); |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| impl Node { |
| fn is_empty(&self) -> bool { |
| self.fast_slots() |
| .chain(core::iter::once(self.helping_slot())) |
| .all(|d| d.0.load(Relaxed) == Debt::NONE) |
| } |
| |
| fn get_thread() -> &'static Self { |
| LocalNode::with(|h| h.node.get().unwrap()) |
| } |
| } |
| |
| /// A freshly acquired thread local node is empty. |
| #[test] |
| fn new_empty() { |
| assert!(Node::get_thread().is_empty()); |
| } |
| } |