blob: b77a9fcb0208fb47190a0d507597248f35452a9d [file] [log] [blame] [edit]
use crate::{
cfg::{self, CfgPrivate},
clear::Clear,
page,
sync::{
alloc,
atomic::{
AtomicPtr, AtomicUsize,
Ordering::{self, *},
},
},
tid::Tid,
Pack,
};
use std::{fmt, ptr, slice};
// ┌─────────────┐ ┌────────┐
// │ page 1 │ │ │
// ├─────────────┤ ┌───▶│ next──┼─┐
// │ page 2 │ │ ├────────┤ │
// │ │ │ │XXXXXXXX│ │
// │ local_free──┼─┘ ├────────┤ │
// │ global_free─┼─┐ │ │◀┘
// ├─────────────┤ └───▶│ next──┼─┐
// │ page 3 │ ├────────┤ │
// └─────────────┘ │XXXXXXXX│ │
// ... ├────────┤ │
// ┌─────────────┐ │XXXXXXXX│ │
// │ page n │ ├────────┤ │
// └─────────────┘ │ │◀┘
// │ next──┼───▶
// ├────────┤
// │XXXXXXXX│
// └────────┘
// ...
pub(crate) struct Shard<T, C: cfg::Config> {
/// The shard's parent thread ID.
pub(crate) tid: usize,
/// The local free list for each page.
///
/// These are only ever accessed from this shard's thread, so they are
/// stored separately from the shared state for the page that can be
/// accessed concurrently, to minimize false sharing.
local: Box<[page::Local]>,
/// The shared state for each page in this shard.
///
/// This consists of the page's metadata (size, previous size), remote free
/// list, and a pointer to the actual array backing that page.
shared: Box<[page::Shared<T, C>]>,
}
pub(crate) struct Array<T, C: cfg::Config> {
shards: Box<[Ptr<T, C>]>,
max: AtomicUsize,
}
#[derive(Debug)]
struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>);
#[derive(Debug)]
pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>);
// === impl Shard ===
impl<T, C> Shard<T, C>
where
C: cfg::Config,
{
#[inline(always)]
pub(crate) fn with_slot<'a, U>(
&'a self,
idx: usize,
f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>,
) -> Option<U> {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
test_println!("-> {:?}", addr);
if page_index >= self.shared.len() {
return None;
}
self.shared[page_index].with_slot(addr, f)
}
pub(crate) fn new(tid: usize) -> Self {
let mut total_sz = 0;
let shared = (0..C::MAX_PAGES)
.map(|page_num| {
let sz = C::page_size(page_num);
let prev_sz = total_sz;
total_sz += sz;
page::Shared::new(sz, prev_sz)
})
.collect();
let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect();
Self { tid, local, shared }
}
}
impl<T, C> Shard<Option<T>, C>
where
C: cfg::Config,
{
/// Remove an item on the shard's local thread.
pub(crate) fn take_local(&self, idx: usize) -> Option<T> {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
test_println!("-> remove_local {:?}", addr);
self.shared
.get(page_index)?
.take(addr, C::unpack_gen(idx), self.local(page_index))
}
/// Remove an item, while on a different thread from the shard's local thread.
pub(crate) fn take_remote(&self, idx: usize) -> Option<T> {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
debug_assert!(Tid::<C>::current().as_usize() != self.tid);
let (addr, page_index) = page::indices::<C>(idx);
test_println!("-> take_remote {:?}; page {:?}", addr, page_index);
let shared = self.shared.get(page_index)?;
shared.take(addr, C::unpack_gen(idx), shared.free_list())
}
pub(crate) fn remove_local(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index))
}
pub(crate) fn remove_remote(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
let shared = &self.shared[page_index];
shared.remove(addr, C::unpack_gen(idx), shared.free_list())
}
pub(crate) fn iter(&self) -> std::slice::Iter<'_, page::Shared<Option<T>, C>> {
self.shared.iter()
}
}
impl<T, C> Shard<T, C>
where
T: Clear + Default,
C: cfg::Config,
{
pub(crate) fn init_with<U>(
&self,
mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>,
) -> Option<U> {
// Can we fit the value into an exist`ing page?
for (page_idx, page) in self.shared.iter().enumerate() {
let local = self.local(page_idx);
test_println!("-> page {}; {:?}; {:?}", page_idx, local, page);
if let Some(res) = page.init_with(local, &mut init) {
return Some(res);
}
}
None
}
pub(crate) fn mark_clear_local(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index))
}
pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
let shared = &self.shared[page_index];
shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list())
}
pub(crate) fn clear_after_release(&self, idx: usize) {
crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire);
let tid = Tid::<C>::current().as_usize();
test_println!(
"-> clear_after_release; self.tid={:?}; current.tid={:?};",
tid,
self.tid
);
if tid == self.tid {
self.clear_local(idx);
} else {
self.clear_remote(idx);
}
}
fn clear_local(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index))
}
fn clear_remote(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
let shared = &self.shared[page_index];
shared.clear(addr, C::unpack_gen(idx), shared.free_list())
}
#[inline(always)]
fn local(&self, i: usize) -> &page::Local {
#[cfg(debug_assertions)]
debug_assert_eq_in_drop!(
Tid::<C>::current().as_usize(),
self.tid,
"tried to access local data from another thread!"
);
&self.local[i]
}
}
impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("Shard");
#[cfg(debug_assertions)]
d.field("tid", &self.tid);
d.field("shared", &self.shared).finish()
}
}
// === impl Array ===
impl<T, C> Array<T, C>
where
C: cfg::Config,
{
pub(crate) fn new() -> Self {
let mut shards = Vec::with_capacity(C::MAX_SHARDS);
for _ in 0..C::MAX_SHARDS {
// XXX(eliza): T_T this could be avoided with maybeuninit or something...
shards.push(Ptr::null());
}
Self {
shards: shards.into(),
max: AtomicUsize::new(0),
}
}
#[inline]
pub(crate) fn get(&self, idx: usize) -> Option<&Shard<T, C>> {
test_println!("-> get shard={}", idx);
self.shards.get(idx)?.load(Acquire)
}
#[inline]
pub(crate) fn current(&self) -> (Tid<C>, &Shard<T, C>) {
let tid = Tid::<C>::current();
test_println!("current: {:?}", tid);
let idx = tid.as_usize();
assert!(
idx < self.shards.len(),
"Thread count overflowed the configured max count. \
Thread index = {}, max threads = {}.",
idx,
C::MAX_SHARDS,
);
// It's okay for this to be relaxed. The value is only ever stored by
// the thread that corresponds to the index, and we are that thread.
let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| {
let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx))));
test_println!("-> allocated new shard for index {} at {:p}", idx, ptr);
self.shards[idx].set(ptr);
let mut max = self.max.load(Acquire);
while max < idx {
match self.max.compare_exchange(max, idx, AcqRel, Acquire) {
Ok(_) => break,
Err(actual) => max = actual,
}
}
test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max);
unsafe {
// Safety: we just put it there!
&*ptr
}
.get_ref()
});
(tid, shard)
}
pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> {
test_println!("Array::iter_mut");
let max = self.max.load(Acquire);
test_println!("-> highest index={}", max);
IterMut(self.shards[0..=max].iter_mut())
}
}
impl<T, C: cfg::Config> Drop for Array<T, C> {
fn drop(&mut self) {
// XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
let max = self.max.load(Acquire);
for shard in &self.shards[0..=max] {
// XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
let ptr = shard.0.load(Acquire);
if ptr.is_null() {
continue;
}
let shard = unsafe {
// Safety: this is the only place where these boxes are
// deallocated, and we have exclusive access to the shard array,
// because...we are dropping it...
Box::from_raw(ptr)
};
drop(shard)
}
}
}
impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let max = self.max.load(Acquire);
let mut set = f.debug_map();
for shard in &self.shards[0..=max] {
let ptr = shard.0.load(Acquire);
if let Some(shard) = ptr::NonNull::new(ptr) {
set.entry(&format_args!("{:p}", ptr), unsafe { shard.as_ref() });
} else {
set.entry(&format_args!("{:p}", ptr), &());
}
}
set.finish()
}
}
// === impl Ptr ===
impl<T, C: cfg::Config> Ptr<T, C> {
#[inline]
fn null() -> Self {
Self(AtomicPtr::new(ptr::null_mut()))
}
#[inline]
fn load(&self, order: Ordering) -> Option<&Shard<T, C>> {
let ptr = self.0.load(order);
test_println!("---> loaded={:p} (order={:?})", ptr, order);
if ptr.is_null() {
test_println!("---> null");
return None;
}
let track = unsafe {
// Safety: The returned reference will have the same lifetime as the
// reference to the shard pointer, which (morally, if not actually)
// owns the shard. The shard is only deallocated when the shard
// array is dropped, and it won't be dropped while this pointer is
// borrowed --- and the returned reference has the same lifetime.
//
// We know that the pointer is not null, because we just
// null-checked it immediately prior.
&*ptr
};
Some(track.get_ref())
}
#[inline]
fn set(&self, new: *mut alloc::Track<Shard<T, C>>) {
self.0
.compare_exchange(ptr::null_mut(), new, AcqRel, Acquire)
.expect("a shard can only be inserted by the thread that owns it, this is a bug!");
}
}
// === Iterators ===
impl<'a, T, C> Iterator for IterMut<'a, T, C>
where
T: 'a,
C: cfg::Config + 'a,
{
type Item = &'a Shard<T, C>;
fn next(&mut self) -> Option<Self::Item> {
test_println!("IterMut::next");
loop {
// Skip over empty indices if they are less than the highest
// allocated shard. Some threads may have accessed the slab
// (generating a thread ID) but never actually inserted data, so
// they may have never allocated a shard.
let next = self.0.next();
test_println!("-> next.is_some={}", next.is_some());
if let Some(shard) = next?.load(Acquire) {
test_println!("-> done");
return Some(shard);
}
}
}
}