aboutsummaryrefslogtreecommitdiff
path: root/src/process/unix/orphan.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/process/unix/orphan.rs')
-rw-r--r--src/process/unix/orphan.rs195
1 files changed, 149 insertions, 46 deletions
diff --git a/src/process/unix/orphan.rs b/src/process/unix/orphan.rs
index 8a1e127..07f0dcf 100644
--- a/src/process/unix/orphan.rs
+++ b/src/process/unix/orphan.rs
@@ -1,6 +1,9 @@
+use crate::loom::sync::{Mutex, MutexGuard};
+use crate::signal::unix::driver::Handle as SignalHandle;
+use crate::signal::unix::{signal_with_handle, SignalKind};
+use crate::sync::watch;
use std::io;
use std::process::ExitStatus;
-use std::sync::Mutex;
/// An interface for waiting on a process to exit.
pub(crate) trait Wait {
@@ -20,21 +23,8 @@ impl<T: Wait> Wait for &mut T {
}
}
-/// An interface for reaping a set of orphaned processes.
-pub(crate) trait ReapOrphanQueue {
- /// Attempts to reap every process in the queue, ignoring any errors and
- /// enqueueing any orphans which have not yet exited.
- fn reap_orphans(&self);
-}
-
-impl<T: ReapOrphanQueue> ReapOrphanQueue for &T {
- fn reap_orphans(&self) {
- (**self).reap_orphans()
- }
-}
-
/// An interface for queueing up an orphaned process so that it can be reaped.
-pub(crate) trait OrphanQueue<T>: ReapOrphanQueue {
+pub(crate) trait OrphanQueue<T> {
/// Adds an orphan to the queue.
fn push_orphan(&self, orphan: T);
}
@@ -48,50 +38,91 @@ impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
/// An implementation of `OrphanQueue`.
#[derive(Debug)]
pub(crate) struct OrphanQueueImpl<T> {
+ sigchild: Mutex<Option<watch::Receiver<()>>>,
queue: Mutex<Vec<T>>,
}
impl<T> OrphanQueueImpl<T> {
pub(crate) fn new() -> Self {
Self {
+ sigchild: Mutex::new(None),
queue: Mutex::new(Vec::new()),
}
}
#[cfg(test)]
fn len(&self) -> usize {
- self.queue.lock().unwrap().len()
+ self.queue.lock().len()
}
-}
-impl<T: Wait> OrphanQueue<T> for OrphanQueueImpl<T> {
- fn push_orphan(&self, orphan: T) {
- self.queue.lock().unwrap().push(orphan)
+ pub(crate) fn push_orphan(&self, orphan: T)
+ where
+ T: Wait,
+ {
+ self.queue.lock().push(orphan)
}
-}
-impl<T: Wait> ReapOrphanQueue for OrphanQueueImpl<T> {
- fn reap_orphans(&self) {
- let mut queue = self.queue.lock().unwrap();
- let queue = &mut *queue;
-
- for i in (0..queue.len()).rev() {
- match queue[i].try_wait() {
- Ok(None) => {}
- Ok(Some(_)) | Err(_) => {
- // The stdlib handles interruption errors (EINTR) when polling a child process.
- // All other errors represent invalid inputs or pids that have already been
- // reaped, so we can drop the orphan in case an error is raised.
- queue.swap_remove(i);
+ /// Attempts to reap every process in the queue, ignoring any errors and
+ /// enqueueing any orphans which have not yet exited.
+ pub(crate) fn reap_orphans(&self, handle: &SignalHandle)
+ where
+ T: Wait,
+ {
+ // If someone else is holding the lock, they will be responsible for draining
+ // the queue as necessary, so we can safely bail if that happens
+ if let Some(mut sigchild_guard) = self.sigchild.try_lock() {
+ match &mut *sigchild_guard {
+ Some(sigchild) => {
+ if sigchild.try_has_changed().and_then(Result::ok).is_some() {
+ drain_orphan_queue(self.queue.lock());
+ }
+ }
+ None => {
+ let queue = self.queue.lock();
+
+ // Be lazy and only initialize the SIGCHLD listener if there
+ // are any orphaned processes in the queue.
+ if !queue.is_empty() {
+ // An errors shouldn't really happen here, but if it does it
+ // means that the signal driver isn't running, in
+ // which case there isn't anything we can
+ // register/initialize here, so we can try again later
+ if let Ok(sigchild) = signal_with_handle(SignalKind::child(), &handle) {
+ *sigchild_guard = Some(sigchild);
+ drain_orphan_queue(queue);
+ }
+ }
}
}
}
}
}
+fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>)
+where
+ T: Wait,
+{
+ for i in (0..queue.len()).rev() {
+ match queue[i].try_wait() {
+ Ok(None) => {}
+ Ok(Some(_)) | Err(_) => {
+ // The stdlib handles interruption errors (EINTR) when polling a child process.
+ // All other errors represent invalid inputs or pids that have already been
+ // reaped, so we can drop the orphan in case an error is raised.
+ queue.swap_remove(i);
+ }
+ }
+ }
+
+ drop(queue);
+}
+
#[cfg(all(test, not(loom)))]
pub(crate) mod test {
use super::*;
+ use crate::io::driver::Driver as IoDriver;
+ use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
+ use crate::sync::watch;
use std::cell::{Cell, RefCell};
use std::io;
use std::os::unix::process::ExitStatusExt;
@@ -100,14 +131,12 @@ pub(crate) mod test {
pub(crate) struct MockQueue<W> {
pub(crate) all_enqueued: RefCell<Vec<W>>,
- pub(crate) total_reaps: Cell<usize>,
}
impl<W> MockQueue<W> {
pub(crate) fn new() -> Self {
Self {
all_enqueued: RefCell::new(Vec::new()),
- total_reaps: Cell::new(0),
}
}
}
@@ -118,12 +147,6 @@ pub(crate) mod test {
}
}
- impl<W> ReapOrphanQueue for MockQueue<W> {
- fn reap_orphans(&self) {
- self.total_reaps.set(self.total_reaps.get() + 1);
- }
- }
-
struct MockWait {
total_waits: Rc<Cell<usize>>,
num_wait_until_status: usize,
@@ -191,27 +214,107 @@ pub(crate) mod test {
assert_eq!(orphanage.len(), 4);
- orphanage.reap_orphans();
+ drain_orphan_queue(orphanage.queue.lock());
assert_eq!(orphanage.len(), 2);
assert_eq!(first_waits.get(), 1);
assert_eq!(second_waits.get(), 1);
assert_eq!(third_waits.get(), 1);
assert_eq!(fourth_waits.get(), 1);
- orphanage.reap_orphans();
+ drain_orphan_queue(orphanage.queue.lock());
assert_eq!(orphanage.len(), 1);
assert_eq!(first_waits.get(), 1);
assert_eq!(second_waits.get(), 2);
assert_eq!(third_waits.get(), 2);
assert_eq!(fourth_waits.get(), 1);
- orphanage.reap_orphans();
+ drain_orphan_queue(orphanage.queue.lock());
assert_eq!(orphanage.len(), 0);
assert_eq!(first_waits.get(), 1);
assert_eq!(second_waits.get(), 2);
assert_eq!(third_waits.get(), 3);
assert_eq!(fourth_waits.get(), 1);
- orphanage.reap_orphans(); // Safe to reap when empty
+ // Safe to reap when empty
+ drain_orphan_queue(orphanage.queue.lock());
+ }
+
+ #[test]
+ fn no_reap_if_no_signal_received() {
+ let (tx, rx) = watch::channel(());
+
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ *orphanage.sigchild.lock() = Some(rx);
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ tx.send(()).unwrap();
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 1);
+ }
+
+ #[test]
+ fn no_reap_if_signal_lock_held() {
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ let signal_guard = orphanage.sigchild.lock();
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ drop(signal_guard);
+ }
+
+ #[test]
+ fn does_not_register_signal_if_queue_empty() {
+ let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap();
+ let handle = signal_driver.handle();
+
+ let orphanage = OrphanQueueImpl::new();
+ assert!(orphanage.sigchild.lock().is_none()); // Sanity
+
+ // No register when queue empty
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_none());
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_some());
+ assert_eq!(waits.get(), 1); // Eager reap when registering listener
+ }
+
+ #[test]
+ fn does_nothing_if_signal_could_not_be_registered() {
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ assert!(orphanage.sigchild.lock().is_none());
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ // Signal handler has "gone away", nothing to register or reap
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_none());
+ assert_eq!(waits.get(), 0);
}
}