blob: 6f456ce0e86fbccc26ab1cb9cad616855c944331 [file] [log] [blame]
extern crate crossbeam_queue;
use self::crossbeam_queue::SegQueue;
use std::io;
use std::process::ExitStatus;
/// An interface for waiting on a process to exit.
pub(crate) trait Wait {
/// Get the identifier for this process or diagnostics.
fn id(&self) -> u32;
/// Try waiting for a process to exit in a non-blocking manner.
fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
}
impl<'a, T: 'a + Wait> Wait for &'a mut T {
fn id(&self) -> u32 {
(**self).id()
}
fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
(**self).try_wait()
}
}
/// An interface for queueing up an orphaned process so that it can be reaped.
pub(crate) trait OrphanQueue<T> {
/// Add an orphan to the queue.
fn push_orphan(&self, orphan: T);
/// Attempt to reap every process in the queue, ignoring any errors and
/// enqueueing any orphans which have not yet exited.
fn reap_orphans(&self);
}
impl<'a, T, O: 'a + OrphanQueue<T>> OrphanQueue<T> for &'a O {
fn push_orphan(&self, orphan: T) {
(**self).push_orphan(orphan);
}
fn reap_orphans(&self) {
(**self).reap_orphans()
}
}
/// An atomic implementation of `OrphanQueue`.
#[derive(Debug)]
pub(crate) struct AtomicOrphanQueue<T> {
queue: SegQueue<T>,
}
impl<T> AtomicOrphanQueue<T> {
pub(crate) fn new() -> Self {
Self {
queue: SegQueue::new(),
}
}
}
impl<T: Wait> OrphanQueue<T> for AtomicOrphanQueue<T> {
fn push_orphan(&self, orphan: T) {
self.queue.push(orphan)
}
fn reap_orphans(&self) {
let len = self.queue.len();
if len == 0 {
return;
}
let mut orphans = Vec::with_capacity(len);
while let Ok(mut orphan) = self.queue.pop() {
match orphan.try_wait() {
Ok(Some(_)) => {}
Err(e) => error!(
"leaking orphaned process {} due to try_wait() error: {}",
orphan.id(),
e,
),
// Still not done yet, we need to put it back in the queue
// when were done draining it, so that we don't get stuck
// in an infinite loop here
Ok(None) => orphans.push(orphan),
}
}
for orphan in orphans {
self.queue.push(orphan);
}
}
}
#[cfg(test)]
mod test {
use super::Wait;
use super::{AtomicOrphanQueue, OrphanQueue};
use std::cell::Cell;
use std::io;
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
use std::rc::Rc;
struct MockWait {
total_waits: Rc<Cell<usize>>,
num_wait_until_status: usize,
return_err: bool,
}
impl MockWait {
fn new(num_wait_until_status: usize) -> Self {
Self {
total_waits: Rc::new(Cell::new(0)),
num_wait_until_status,
return_err: false,
}
}
fn with_err() -> Self {
Self {
total_waits: Rc::new(Cell::new(0)),
num_wait_until_status: 0,
return_err: true,
}
}
}
impl Wait for MockWait {
fn id(&self) -> u32 {
42
}
fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
let waits = self.total_waits.get();
let ret = if self.num_wait_until_status == waits {
if self.return_err {
Ok(Some(ExitStatus::from_raw(0)))
} else {
Err(io::Error::new(io::ErrorKind::Other, "mock err"))
}
} else {
Ok(None)
};
self.total_waits.set(waits + 1);
ret
}
}
#[test]
fn drain_attempts_a_single_reap_of_all_queued_orphans() {
let first_orphan = MockWait::new(0);
let second_orphan = MockWait::new(1);
let third_orphan = MockWait::new(2);
let fourth_orphan = MockWait::with_err();
let first_waits = first_orphan.total_waits.clone();
let second_waits = second_orphan.total_waits.clone();
let third_waits = third_orphan.total_waits.clone();
let fourth_waits = fourth_orphan.total_waits.clone();
let orphanage = AtomicOrphanQueue::new();
orphanage.push_orphan(first_orphan);
orphanage.push_orphan(third_orphan);
orphanage.push_orphan(second_orphan);
orphanage.push_orphan(fourth_orphan);
assert_eq!(orphanage.queue.len(), 4);
orphanage.reap_orphans();
assert_eq!(orphanage.queue.len(), 2);
assert_eq!(first_waits.get(), 1);
assert_eq!(second_waits.get(), 1);
assert_eq!(third_waits.get(), 1);
assert_eq!(fourth_waits.get(), 1);
orphanage.reap_orphans();
assert_eq!(orphanage.queue.len(), 1);
assert_eq!(first_waits.get(), 1);
assert_eq!(second_waits.get(), 2);
assert_eq!(third_waits.get(), 2);
assert_eq!(fourth_waits.get(), 1);
orphanage.reap_orphans();
assert_eq!(orphanage.queue.len(), 0);
assert_eq!(first_waits.get(), 1);
assert_eq!(second_waits.get(), 2);
assert_eq!(third_waits.get(), 3);
assert_eq!(fourth_waits.get(), 1);
orphanage.reap_orphans(); // Safe to reap when empty
}
}