blob: 624df6cec2f3556eeae2f15858729b4710a706c6 [file] [log] [blame]
/*! Waste Management
Do you drive your own garbage to the landfill or compost heap? Maybe you should,
but that’s a lot of work and takes time out of your day, so you probably don’t.
Instead, you give it to a worker that specializes in managing objects at the end
of their time in your service.
This module moves objects from the threads where they were working to a single,
global, worker thread when they go out of scope. Since an object that is going
out of scope can no longer be used, you could say that it is *garbage*; since
there is only one worker thread to receive all such objects, you could say that
the worker *collects* them. Wink wink, nudge nudge.
Users need only wrap their values in `BgDrop` to have their garbage collected.
`BgDrop` only accepts `'static` values, since the values are being sent to
another thread that makes no guarantees about timeliness of destruction, and
thus the garbage cannot have any lingering ties to live objects in the rest of
the program.
When a `BgDrop` goes out of scope, it attempts to send its interior value to the
collector thread. The first `BgDrop` to drop must start the collector thread,
which may result in an indefinite block until the thread begins. Once the
collector is running, all `BgDrop` drops will *attempt* to send their internal
value to the collector for destruction. If the send fails, then the value will
be dropped on the sending thread, rather than on the collector.
You can prevent future collections with `cancel_collection()`, which destroys
the channel used to move values to the collector thread. You can also get the
thread key for the collector with `collector()`. If you need to ensure that all
pending destructions occur before program exit, you should end your program with
a `cancel_collection()` and then `collector().unwrap().join()`. The collector
guarantees that objects queued for destruction are either enqueued for future
destruction *or* destroyed immediately, so the collector thread *will* receive a
signal for each object not destroyed on its prior thread.
!*/
#![cfg(all(feature = "std", feature = "garbage"))]
use once_cell::sync::OnceCell;
use tap::Pipe;
use std::{
collections::VecDeque,
marker::PhantomData,
mem::{
self,
ManuallyDrop,
},
ops::{
Deref,
DerefMut,
},
sync::{
mpsc,
Mutex,
MutexGuard,
Once,
RwLock,
},
thread,
};
use typemap::TypeMap;
/** Run an object’s destructor in the background.
When `BgDrop`-wrapped objects go out of scope, the `BgDrop` destructor attempts
to use a global background-thread to receive the wrapped value, so that its
destructor is run on the worker thread. If the thread running a `BgDrop`
destructor is able to send the value to the worker, then it resumes immediately,
and does not wait for the worker to get around to actually running the wrapped
destructor. This is similar to the disposal semantics of many GC systems, though
the actual system used to determine when an object becomes garbage is still the
compiler’s static lifetime analyzer.
All `BgDrop` types use the same persistent worker thread, minimizing the program
cost of deferral.
If the function [`wm::shutdown()`] is called, all future `BgDrop`s become a noöp
and run their contained destructors on their local threads.
[`wm::shutdown()`]: ../fn.shutdown.html
**/
#[repr(transparent)]
pub struct BgDrop<T: 'static> {
inner: ManuallyDrop<T>,
}
impl<T: 'static> BgDrop<T> {
/// Instructs an object to run its destructor in the background.
///
/// This function modifies the wrapped object’s `Drop` implementation to try
/// to, on `drop`, send the inner object to a background thread for actual
/// destruction. If the object cannot be sent to the background when its
/// wrapper goes out of scope, then its destructor runs immediately, in the
/// thread that had been holding the object when the modified destructor was
/// called.
///
/// If the wrapped object is successfully sent to the background, the
/// modified destructor exits, and the current thread resumes work. Once
/// enqueued, the inner object is guaranteed to be *eventually* destroyed,
/// unless the program exits in a manner that prevents the background
/// collector from emptying its work queue.
#[inline(always)]
pub fn new(value: T) -> Self {
Self {
inner: ManuallyDrop::new(value),
}
}
/// Removes the background-destruction marker, returning the interior value.
#[inline(always)]
pub fn into_inner(mut self) -> T {
unsafe { ManuallyDrop::take(&mut self.inner) }
}
/// Attempt to prevent double-deferral, which would cause the outer to send
/// the inner to the worker thread, making the worker thread send the
/// *actual* inner to itself for destruction. This is safe, but stupid.
#[inline(always)]
#[doc(hidden)]
pub fn bg_drop(self) -> Self {
self
}
#[inline(always)]
fn dtor(&mut self) {
// No destructor, no problem! Quit.
if !mem::needs_drop::<T>() {
return;
}
// Ensure that the collector has been initialized.
init();
// Pull the value into local scope, reärming the destructor.
let val = unsafe { ManuallyDrop::take(&mut self.inner) };
// Get a local copy of the outbound channel, or exit.
let sender = match sender() {
Some(s) => s,
None => return,
};
// Enqueue the object into the transfer buffer.
dq().entry::<Key<T>>()
.or_insert_with(VecDeque::new)
.pipe(|v| v.push_back(val));
// Send the dequeueïng destructor to the collector thread, or run it
// locally if the send failed.
if sender.send(dtor::<T>).is_err() {
dtor::<T>();
}
}
}
impl<T: 'static> AsRef<T> for BgDrop<T> {
fn as_ref(&self) -> &T {
&*self.inner
}
}
impl<T: 'static> AsMut<T> for BgDrop<T> {
fn as_mut(&mut self) -> &mut T {
&mut *self.inner
}
}
impl<T: 'static> Deref for BgDrop<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&*self.inner
}
}
impl<T: 'static> DerefMut for BgDrop<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.inner
}
}
impl<T: 'static> Drop for BgDrop<T> {
fn drop(&mut self) {
self.dtor();
}
}
/// Attaches a `BgDrop` constructor to all suitable types.
pub trait BgDropExt: Sized + 'static {
/// Modifies the object’s destructor to run in the background.
///
/// When this value goes out of scope, it will attempt to send itself to a
/// background thread where its *actual* destructor will be run. The actual
/// destructor will run on the local thread only if the transfer to the
/// background worker was unable to occur.
///
/// The background worker is started only when the first value marked for
/// deferred destruction actually drops, so the first call will block until
/// the disposal system is initialized.
///
/// The first value of each *type* to be deferred will modify the disposal
/// system to handle its type.
///
/// All subsequent drops of a type that has been deferred before will happen
/// nearly instantaneously, as they must only observe that the system is set
/// up for them, and move the value into the transfer queue.
///
/// # Usage
///
/// ```rust
/// use wyz::wm::BgDropExt;
///
/// vec![1, 2, 3, 4, 5].bg_drop();
/// vec![6, 7, 8, 9, 10].bg_drop();
/// ```
///
/// If you need to guarantee that your program remains open until all
/// deferred objects are destroyed, you can block on [`wm::shutdown()`].
///
/// [`wm::shutdown()`]: ../fn.shutdown.html
fn bg_drop(self) -> BgDrop<Self> {
BgDrop::new(self)
}
}
impl<T: Sized + 'static> BgDropExt for T {
}
/** Stop the background disposal system.
This function shuts down the disposal system, and ensures that all deferred
destructors in the program are correctly handled. It disables all *future*
deferred-drops from sending values to the worker thread, which forces them to
run their destructors locally. In the meantime, the worker thread will pull all
remaining values out of its work queue and destroy them, then terminate once it
sees that its queue has been closed.
When this function returns, the worker thread will have emptied its queue, torn
down its transfer system, and exited.
You may call this function more than once; it is idempotent. The worker system
is program-global, and will only be started once and stopped once. Once this
function is called, the program will never run deferred disposal again.
Rust does not provide a portable `atexit` behavior, so you are responsible for
calling this before your program terminates if you want to ensure that all
deferred destructions actually take place. Future versions of this library may
register `wm::shutdown()` with the sytem `atexit` handler. If this occurs, the
function will be marked as deprecated on platforms where it is set.
**/
pub fn shutdown() {
static STOP: Once = Once::new();
STOP.call_once(|| {
// Destroy the sender handle.
let _: Option<AssertThreadsafe<mpsc::Sender<Dtor>>> = SEND
.get()
// Lock the write guard,
.and_then(|rw| rw.write().ok())
// And remove the sender handle from it.
.and_then(|mut sender| sender.take());
// Close the destructor thread.
let _: Option<()> = JOIN
.get()
// Lock the thread’s mutex,
.and_then(|mx| mx.lock().ok())
// Remove the handle from it,
.and_then(|mut mg| mg.take())
// And await the thread’s termination.
.and_then(|jh| jh.join().ok());
});
}
// Disposal system implementation
type Dtor = fn() -> ();
// The sender is never used concurrently.
static SEND: OnceCell<RwLock<Option<AssertThreadsafe<mpsc::Sender<Dtor>>>>> =
OnceCell::new();
// The map is only ever used behind a mutex lock.
static DUMP: OnceCell<Mutex<AssertThreadsafe<TypeMap>>> = OnceCell::new();
static JOIN: OnceCell<Mutex<Option<thread::JoinHandle<()>>>> = OnceCell::new();
/// Initialize the collection system.
#[inline(never)]
fn init() {
let (send, recv) = mpsc::channel::<Dtor>();
// Establish a base sending channel. This holds the collector open until
// `cancel()` is called.
SEND.get_or_init(|| {
send.pipe(AssertThreadsafe::new)
.pipe(Some)
.pipe(RwLock::new)
});
// Establish a transfer queue for all types.
DUMP.get_or_init(|| {
TypeMap::new().pipe(AssertThreadsafe::new).pipe(Mutex::new)
});
// Start the collector thread.
JOIN.get_or_init(|| {
thread::spawn(move || {
while let Ok(ev) = recv.recv() {
(ev)()
}
let _ = mem::replace(&mut **dq(), TypeMap::new());
})
.pipe(Some)
.pipe(Mutex::new)
});
// TODO(myrrlyn): Register an `atexit` handler to run `shutdown()`.
}
/// Lock the transfer map.
fn dq() -> MutexGuard<'static, AssertThreadsafe<TypeMap>> {
unsafe { DUMP.get_unchecked() }
.lock()
.expect("Collection buffer should never observe a panic")
}
/// Pull the front object out of a typed queue, and destroy it.
fn dtor<T: 'static>() {
// Binding a value causes it to drop *after* any temporaries created in its
// construction.
let _tmp = dq()
// View the deque containing objects of this type.
.get_mut::<Key<T>>()
// And pop the front value in the queue. It is acceptable to fail.
.and_then(VecDeque::pop_front);
// The mutex lock returned by `dq()` drops immediately after the semicolon,
// and the `_tmp` binding drops immediately before the terminating brace.
}
/// Get a local copy of the sender, free of threading concerns.
fn sender() -> Option<mpsc::Sender<Dtor>> {
// `sender` is only called after `SEND` is initialized
unsafe { SEND.get_unchecked() }
// Quit if the send channel could not be opened for reading
.read()
.ok()?
// or if it contains `None`
.as_ref()?
.inner
// and copy the sender into the local thread.
.clone()
.pipe(Some)
}
/// Look up a type’s location in the transfer map.
struct Key<T: 'static>(PhantomData<T>);
impl<T: 'static> typemap::Key for Key<T> {
/// The transfer map holds some form of collection of the transferred types.
///
/// The specific collection type is irrelevant, as long as it supports both
/// insertion and removal, and has reasonable behavior characteristics.
/// Since the map has to be completely locked for any transfer event, as the
/// first transfer of each type must insert its queue into the map, there is
/// no advantage in trying to make this mpsc-friendly.
///
/// If Rust were to allow a construction like
///
/// ```rust,ignore
/// fn type_chan<T: 'static>() -> (
/// &'static mpsc::Sender<T>,
/// &'static mpsc::Receiver<T>,
/// ) {
/// static MAKE: Once = Once::new();
/// static mut CHAN: Option<(mpsc::Sender<T>, mpsc::Receiver<T>) = None;
/// MAKE.call_once(|| unsafe {
/// CHAN = Some(mpsc::channel());
/// });
/// (&CHAN.0, &CHAN.1)
/// }
/// ```
///
/// then a dynamic type-map would not be necessary at all, since each type
/// would be granted its own dedicated channel at compile-time. But Rust
/// does not, so, it is.
type Value = VecDeque<T>;
}
/** Get off my back, `rustc`.
This is required because `static` vars must be `Sync`, and the thread-safety
wrappers in use apparently inherit the `Sync`hronicity of their wrapped types.
This module uses them correctly, and does not permit escape, so this struct is
needed to get the compiler to accept our use.
**/
#[repr(transparent)]
struct AssertThreadsafe<T> {
inner: T,
}
impl<T> AssertThreadsafe<T> {
fn new(inner: T) -> Self {
Self { inner }
}
}
unsafe impl<T> Send for AssertThreadsafe<T> {
}
unsafe impl<T> Sync for AssertThreadsafe<T> {
}
impl<T> Deref for AssertThreadsafe<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> DerefMut for AssertThreadsafe<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{
sync::atomic::{
AtomicUsize,
Ordering::Relaxed,
},
thread,
time::Duration,
};
#[test]
fn trash_pickup() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
struct Deferrer<F: FnMut()>(F);
impl<F: FnMut()> Drop for Deferrer<F> {
fn drop(&mut self) {
(self.0)()
}
}
let kept = Deferrer(|| {
COUNTER.fetch_add(1, Relaxed);
});
let sent = Deferrer(|| {
COUNTER.fetch_add(1, Relaxed);
})
.bg_drop();
drop(kept);
drop(sent);
while COUNTER.load(Relaxed) < 2 {
thread::sleep(Duration::from_millis(100));
}
}
}