| use alloc::alloc::{alloc_zeroed, handle_alloc_error, Layout}; |
| use alloc::boxed::Box; |
| use core::cell::UnsafeCell; |
| use core::fmt; |
| use core::marker::PhantomData; |
| use core::mem::MaybeUninit; |
| use core::panic::{RefUnwindSafe, UnwindSafe}; |
| use core::ptr; |
| use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; |
| |
| use crossbeam_utils::{Backoff, CachePadded}; |
| |
| // Bits indicating the state of a slot: |
| // * If a value has been written into the slot, `WRITE` is set. |
| // * If a value has been read from the slot, `READ` is set. |
| // * If the block is being destroyed, `DESTROY` is set. |
| const WRITE: usize = 1; |
| const READ: usize = 2; |
| const DESTROY: usize = 4; |
| |
| // Each block covers one "lap" of indices. |
| const LAP: usize = 32; |
| // The maximum number of values a block can hold. |
| const BLOCK_CAP: usize = LAP - 1; |
| // How many lower bits are reserved for metadata. |
| const SHIFT: usize = 1; |
| // Indicates that the block is not the last one. |
| const HAS_NEXT: usize = 1; |
| |
| /// A slot in a block. |
| struct Slot<T> { |
| /// The value. |
| value: UnsafeCell<MaybeUninit<T>>, |
| |
| /// The state of the slot. |
| state: AtomicUsize, |
| } |
| |
| impl<T> Slot<T> { |
| /// Waits until a value is written into the slot. |
| fn wait_write(&self) { |
| let backoff = Backoff::new(); |
| while self.state.load(Ordering::Acquire) & WRITE == 0 { |
| backoff.snooze(); |
| } |
| } |
| } |
| |
| /// A block in a linked list. |
| /// |
| /// Each block in the list can hold up to `BLOCK_CAP` values. |
| struct Block<T> { |
| /// The next block in the linked list. |
| next: AtomicPtr<Block<T>>, |
| |
| /// Slots for values. |
| slots: [Slot<T>; BLOCK_CAP], |
| } |
| |
| impl<T> Block<T> { |
| const LAYOUT: Layout = { |
| let layout = Layout::new::<Self>(); |
| assert!( |
| layout.size() != 0, |
| "Block should never be zero-sized, as it has an AtomicPtr field" |
| ); |
| layout |
| }; |
| |
| /// Creates an empty block. |
| fn new() -> Box<Self> { |
| // SAFETY: layout is not zero-sized |
| let ptr = unsafe { alloc_zeroed(Self::LAYOUT) }; |
| // Handle allocation failure |
| if ptr.is_null() { |
| handle_alloc_error(Self::LAYOUT) |
| } |
| // SAFETY: This is safe because: |
| // [1] `Block::next` (AtomicPtr) may be safely zero initialized. |
| // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. |
| // [3] `Slot::value` (UnsafeCell) may be safely zero initialized because it |
| // holds a MaybeUninit. |
| // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. |
| // TODO: unsafe { Box::new_zeroed().assume_init() } |
| unsafe { Box::from_raw(ptr.cast()) } |
| } |
| |
| /// Waits until the next pointer is set. |
| fn wait_next(&self) -> *mut Block<T> { |
| let backoff = Backoff::new(); |
| loop { |
| let next = self.next.load(Ordering::Acquire); |
| if !next.is_null() { |
| return next; |
| } |
| backoff.snooze(); |
| } |
| } |
| |
| /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. |
| unsafe fn destroy(this: *mut Block<T>, start: usize) { |
| // It is not necessary to set the `DESTROY` bit in the last slot because that slot has |
| // begun destruction of the block. |
| for i in start..BLOCK_CAP - 1 { |
| let slot = (*this).slots.get_unchecked(i); |
| |
| // Mark the `DESTROY` bit if a thread is still using the slot. |
| if slot.state.load(Ordering::Acquire) & READ == 0 |
| && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 |
| { |
| // If a thread is still using the slot, it will continue destruction of the block. |
| return; |
| } |
| } |
| |
| // No thread is using the block, now it is safe to destroy it. |
| drop(Box::from_raw(this)); |
| } |
| } |
| |
| /// A position in a queue. |
| struct Position<T> { |
| /// The index in the queue. |
| index: AtomicUsize, |
| |
| /// The block in the linked list. |
| block: AtomicPtr<Block<T>>, |
| } |
| |
| /// An unbounded multi-producer multi-consumer queue. |
| /// |
| /// This queue is implemented as a linked list of segments, where each segment is a small buffer |
| /// that can hold a handful of elements. There is no limit to how many elements can be in the queue |
| /// at a time. However, since segments need to be dynamically allocated as elements get pushed, |
| /// this queue is somewhat slower than [`ArrayQueue`]. |
| /// |
| /// [`ArrayQueue`]: super::ArrayQueue |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_queue::SegQueue; |
| /// |
| /// let q = SegQueue::new(); |
| /// |
| /// q.push('a'); |
| /// q.push('b'); |
| /// |
| /// assert_eq!(q.pop(), Some('a')); |
| /// assert_eq!(q.pop(), Some('b')); |
| /// assert!(q.pop().is_none()); |
| /// ``` |
| pub struct SegQueue<T> { |
| /// The head of the queue. |
| head: CachePadded<Position<T>>, |
| |
| /// The tail of the queue. |
| tail: CachePadded<Position<T>>, |
| |
| /// Indicates that dropping a `SegQueue<T>` may drop values of type `T`. |
| _marker: PhantomData<T>, |
| } |
| |
| unsafe impl<T: Send> Send for SegQueue<T> {} |
| unsafe impl<T: Send> Sync for SegQueue<T> {} |
| |
| impl<T> UnwindSafe for SegQueue<T> {} |
| impl<T> RefUnwindSafe for SegQueue<T> {} |
| |
| impl<T> SegQueue<T> { |
| /// Creates a new unbounded queue. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_queue::SegQueue; |
| /// |
| /// let q = SegQueue::<i32>::new(); |
| /// ``` |
| pub const fn new() -> SegQueue<T> { |
| SegQueue { |
| head: CachePadded::new(Position { |
| block: AtomicPtr::new(ptr::null_mut()), |
| index: AtomicUsize::new(0), |
| }), |
| tail: CachePadded::new(Position { |
| block: AtomicPtr::new(ptr::null_mut()), |
| index: AtomicUsize::new(0), |
| }), |
| _marker: PhantomData, |
| } |
| } |
| |
| /// Pushes back an element to the tail. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_queue::SegQueue; |
| /// |
| /// let q = SegQueue::new(); |
| /// |
| /// q.push(10); |
| /// q.push(20); |
| /// ``` |
| pub fn push(&self, value: T) { |
| let backoff = Backoff::new(); |
| let mut tail = self.tail.index.load(Ordering::Acquire); |
| let mut block = self.tail.block.load(Ordering::Acquire); |
| let mut next_block = None; |
| |
| loop { |
| // Calculate the offset of the index into the block. |
| let offset = (tail >> SHIFT) % LAP; |
| |
| // If we reached the end of the block, wait until the next one is installed. |
| if offset == BLOCK_CAP { |
| backoff.snooze(); |
| tail = self.tail.index.load(Ordering::Acquire); |
| block = self.tail.block.load(Ordering::Acquire); |
| continue; |
| } |
| |
| // If we're going to have to install the next block, allocate it in advance in order to |
| // make the wait for other threads as short as possible. |
| if offset + 1 == BLOCK_CAP && next_block.is_none() { |
| next_block = Some(Block::<T>::new()); |
| } |
| |
| // If this is the first push operation, we need to allocate the first block. |
| if block.is_null() { |
| let new = Box::into_raw(Block::<T>::new()); |
| |
| if self |
| .tail |
| .block |
| .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) |
| .is_ok() |
| { |
| self.head.block.store(new, Ordering::Release); |
| block = new; |
| } else { |
| next_block = unsafe { Some(Box::from_raw(new)) }; |
| tail = self.tail.index.load(Ordering::Acquire); |
| block = self.tail.block.load(Ordering::Acquire); |
| continue; |
| } |
| } |
| |
| let new_tail = tail + (1 << SHIFT); |
| |
| // Try advancing the tail forward. |
| match self.tail.index.compare_exchange_weak( |
| tail, |
| new_tail, |
| Ordering::SeqCst, |
| Ordering::Acquire, |
| ) { |
| Ok(_) => unsafe { |
| // If we've reached the end of the block, install the next one. |
| if offset + 1 == BLOCK_CAP { |
| let next_block = Box::into_raw(next_block.unwrap()); |
| let next_index = new_tail.wrapping_add(1 << SHIFT); |
| |
| self.tail.block.store(next_block, Ordering::Release); |
| self.tail.index.store(next_index, Ordering::Release); |
| (*block).next.store(next_block, Ordering::Release); |
| } |
| |
| // Write the value into the slot. |
| let slot = (*block).slots.get_unchecked(offset); |
| slot.value.get().write(MaybeUninit::new(value)); |
| slot.state.fetch_or(WRITE, Ordering::Release); |
| |
| return; |
| }, |
| Err(t) => { |
| tail = t; |
| block = self.tail.block.load(Ordering::Acquire); |
| backoff.spin(); |
| } |
| } |
| } |
| } |
| |
| /// Pops the head element from the queue. |
| /// |
| /// If the queue is empty, `None` is returned. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_queue::SegQueue; |
| /// |
| /// let q = SegQueue::new(); |
| /// |
| /// q.push(10); |
| /// q.push(20); |
| /// assert_eq!(q.pop(), Some(10)); |
| /// assert_eq!(q.pop(), Some(20)); |
| /// assert!(q.pop().is_none()); |
| /// ``` |
| pub fn pop(&self) -> Option<T> { |
| let backoff = Backoff::new(); |
| let mut head = self.head.index.load(Ordering::Acquire); |
| let mut block = self.head.block.load(Ordering::Acquire); |
| |
| loop { |
| // Calculate the offset of the index into the block. |
| let offset = (head >> SHIFT) % LAP; |
| |
| // If we reached the end of the block, wait until the next one is installed. |
| if offset == BLOCK_CAP { |
| backoff.snooze(); |
| head = self.head.index.load(Ordering::Acquire); |
| block = self.head.block.load(Ordering::Acquire); |
| continue; |
| } |
| |
| let mut new_head = head + (1 << SHIFT); |
| |
| if new_head & HAS_NEXT == 0 { |
| atomic::fence(Ordering::SeqCst); |
| let tail = self.tail.index.load(Ordering::Relaxed); |
| |
| // If the tail equals the head, that means the queue is empty. |
| if head >> SHIFT == tail >> SHIFT { |
| return None; |
| } |
| |
| // If head and tail are not in the same block, set `HAS_NEXT` in head. |
| if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { |
| new_head |= HAS_NEXT; |
| } |
| } |
| |
| // The block can be null here only if the first push operation is in progress. In that |
| // case, just wait until it gets initialized. |
| if block.is_null() { |
| backoff.snooze(); |
| head = self.head.index.load(Ordering::Acquire); |
| block = self.head.block.load(Ordering::Acquire); |
| continue; |
| } |
| |
| // Try moving the head index forward. |
| match self.head.index.compare_exchange_weak( |
| head, |
| new_head, |
| Ordering::SeqCst, |
| Ordering::Acquire, |
| ) { |
| Ok(_) => unsafe { |
| // If we've reached the end of the block, move to the next one. |
| if offset + 1 == BLOCK_CAP { |
| let next = (*block).wait_next(); |
| let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT); |
| if !(*next).next.load(Ordering::Relaxed).is_null() { |
| next_index |= HAS_NEXT; |
| } |
| |
| self.head.block.store(next, Ordering::Release); |
| self.head.index.store(next_index, Ordering::Release); |
| } |
| |
| // Read the value. |
| let slot = (*block).slots.get_unchecked(offset); |
| slot.wait_write(); |
| let value = slot.value.get().read().assume_init(); |
| |
| // Destroy the block if we've reached the end, or if another thread wanted to |
| // destroy but couldn't because we were busy reading from the slot. |
| if offset + 1 == BLOCK_CAP { |
| Block::destroy(block, 0); |
| } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { |
| Block::destroy(block, offset + 1); |
| } |
| |
| return Some(value); |
| }, |
| Err(h) => { |
| head = h; |
| block = self.head.block.load(Ordering::Acquire); |
| backoff.spin(); |
| } |
| } |
| } |
| } |
| |
| /// Returns `true` if the queue is empty. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_queue::SegQueue; |
| /// |
| /// let q = SegQueue::new(); |
| /// |
| /// assert!(q.is_empty()); |
| /// q.push(1); |
| /// assert!(!q.is_empty()); |
| /// ``` |
| pub fn is_empty(&self) -> bool { |
| let head = self.head.index.load(Ordering::SeqCst); |
| let tail = self.tail.index.load(Ordering::SeqCst); |
| head >> SHIFT == tail >> SHIFT |
| } |
| |
| /// Returns the number of elements in the queue. |
| /// |
| /// # Examples |
| /// |
| /// ``` |
| /// use crossbeam_queue::SegQueue; |
| /// |
| /// let q = SegQueue::new(); |
| /// assert_eq!(q.len(), 0); |
| /// |
| /// q.push(10); |
| /// assert_eq!(q.len(), 1); |
| /// |
| /// q.push(20); |
| /// assert_eq!(q.len(), 2); |
| /// ``` |
| pub fn len(&self) -> usize { |
| loop { |
| // Load the tail index, then load the head index. |
| let mut tail = self.tail.index.load(Ordering::SeqCst); |
| let mut head = self.head.index.load(Ordering::SeqCst); |
| |
| // If the tail index didn't change, we've got consistent indices to work with. |
| if self.tail.index.load(Ordering::SeqCst) == tail { |
| // Erase the lower bits. |
| tail &= !((1 << SHIFT) - 1); |
| head &= !((1 << SHIFT) - 1); |
| |
| // Fix up indices if they fall onto block ends. |
| if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { |
| tail = tail.wrapping_add(1 << SHIFT); |
| } |
| if (head >> SHIFT) & (LAP - 1) == LAP - 1 { |
| head = head.wrapping_add(1 << SHIFT); |
| } |
| |
| // Rotate indices so that head falls into the first block. |
| let lap = (head >> SHIFT) / LAP; |
| tail = tail.wrapping_sub((lap * LAP) << SHIFT); |
| head = head.wrapping_sub((lap * LAP) << SHIFT); |
| |
| // Remove the lower bits. |
| tail >>= SHIFT; |
| head >>= SHIFT; |
| |
| // Return the difference minus the number of blocks between tail and head. |
| return tail - head - tail / LAP; |
| } |
| } |
| } |
| } |
| |
| impl<T> Drop for SegQueue<T> { |
| fn drop(&mut self) { |
| let mut head = *self.head.index.get_mut(); |
| let mut tail = *self.tail.index.get_mut(); |
| let mut block = *self.head.block.get_mut(); |
| |
| // Erase the lower bits. |
| head &= !((1 << SHIFT) - 1); |
| tail &= !((1 << SHIFT) - 1); |
| |
| unsafe { |
| // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. |
| while head != tail { |
| let offset = (head >> SHIFT) % LAP; |
| |
| if offset < BLOCK_CAP { |
| // Drop the value in the slot. |
| let slot = (*block).slots.get_unchecked(offset); |
| (*slot.value.get()).assume_init_drop(); |
| } else { |
| // Deallocate the block and move to the next one. |
| let next = *(*block).next.get_mut(); |
| drop(Box::from_raw(block)); |
| block = next; |
| } |
| |
| head = head.wrapping_add(1 << SHIFT); |
| } |
| |
| // Deallocate the last remaining block. |
| if !block.is_null() { |
| drop(Box::from_raw(block)); |
| } |
| } |
| } |
| } |
| |
| impl<T> fmt::Debug for SegQueue<T> { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.pad("SegQueue { .. }") |
| } |
| } |
| |
| impl<T> Default for SegQueue<T> { |
| fn default() -> SegQueue<T> { |
| SegQueue::new() |
| } |
| } |
| |
| impl<T> IntoIterator for SegQueue<T> { |
| type Item = T; |
| |
| type IntoIter = IntoIter<T>; |
| |
| fn into_iter(self) -> Self::IntoIter { |
| IntoIter { value: self } |
| } |
| } |
| |
| #[derive(Debug)] |
| pub struct IntoIter<T> { |
| value: SegQueue<T>, |
| } |
| |
| impl<T> Iterator for IntoIter<T> { |
| type Item = T; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| let value = &mut self.value; |
| let head = *value.head.index.get_mut(); |
| let tail = *value.tail.index.get_mut(); |
| if head >> SHIFT == tail >> SHIFT { |
| None |
| } else { |
| let block = *value.head.block.get_mut(); |
| let offset = (head >> SHIFT) % LAP; |
| |
| // SAFETY: We have mutable access to this, so we can read without |
| // worrying about concurrency. Furthermore, we know this is |
| // initialized because it is the value pointed at by `value.head` |
| // and this is a non-empty queue. |
| let item = unsafe { |
| let slot = (*block).slots.get_unchecked(offset); |
| slot.value.get().read().assume_init() |
| }; |
| if offset + 1 == BLOCK_CAP { |
| // Deallocate the block and move to the next one. |
| // SAFETY: The block is initialized because we've been reading |
| // from it this entire time. We can drop it b/c everything has |
| // been read out of it, so nothing is pointing to it anymore. |
| unsafe { |
| let next = *(*block).next.get_mut(); |
| drop(Box::from_raw(block)); |
| *value.head.block.get_mut() = next; |
| } |
| // The last value in a block is empty, so skip it |
| *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT); |
| // Double-check that we're pointing to the first item in a block. |
| debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0); |
| } else { |
| *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT); |
| } |
| Some(item) |
| } |
| } |
| } |