diff options
Diffstat (limited to 'src/process/unix/orphan.rs')
-rw-r--r-- | src/process/unix/orphan.rs | 195 |
1 files changed, 149 insertions, 46 deletions
diff --git a/src/process/unix/orphan.rs b/src/process/unix/orphan.rs index 8a1e127..07f0dcf 100644 --- a/src/process/unix/orphan.rs +++ b/src/process/unix/orphan.rs @@ -1,6 +1,9 @@ +use crate::loom::sync::{Mutex, MutexGuard}; +use crate::signal::unix::driver::Handle as SignalHandle; +use crate::signal::unix::{signal_with_handle, SignalKind}; +use crate::sync::watch; use std::io; use std::process::ExitStatus; -use std::sync::Mutex; /// An interface for waiting on a process to exit. pub(crate) trait Wait { @@ -20,21 +23,8 @@ impl<T: Wait> Wait for &mut T { } } -/// An interface for reaping a set of orphaned processes. -pub(crate) trait ReapOrphanQueue { - /// Attempts to reap every process in the queue, ignoring any errors and - /// enqueueing any orphans which have not yet exited. - fn reap_orphans(&self); -} - -impl<T: ReapOrphanQueue> ReapOrphanQueue for &T { - fn reap_orphans(&self) { - (**self).reap_orphans() - } -} - /// An interface for queueing up an orphaned process so that it can be reaped. -pub(crate) trait OrphanQueue<T>: ReapOrphanQueue { +pub(crate) trait OrphanQueue<T> { /// Adds an orphan to the queue. fn push_orphan(&self, orphan: T); } @@ -48,50 +38,91 @@ impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O { /// An implementation of `OrphanQueue`. #[derive(Debug)] pub(crate) struct OrphanQueueImpl<T> { + sigchild: Mutex<Option<watch::Receiver<()>>>, queue: Mutex<Vec<T>>, } impl<T> OrphanQueueImpl<T> { pub(crate) fn new() -> Self { Self { + sigchild: Mutex::new(None), queue: Mutex::new(Vec::new()), } } #[cfg(test)] fn len(&self) -> usize { - self.queue.lock().unwrap().len() + self.queue.lock().len() } -} -impl<T: Wait> OrphanQueue<T> for OrphanQueueImpl<T> { - fn push_orphan(&self, orphan: T) { - self.queue.lock().unwrap().push(orphan) + pub(crate) fn push_orphan(&self, orphan: T) + where + T: Wait, + { + self.queue.lock().push(orphan) } -} -impl<T: Wait> ReapOrphanQueue for OrphanQueueImpl<T> { - fn reap_orphans(&self) { - let mut queue = self.queue.lock().unwrap(); - let queue = &mut *queue; - - for i in (0..queue.len()).rev() { - match queue[i].try_wait() { - Ok(None) => {} - Ok(Some(_)) | Err(_) => { - // The stdlib handles interruption errors (EINTR) when polling a child process. - // All other errors represent invalid inputs or pids that have already been - // reaped, so we can drop the orphan in case an error is raised. - queue.swap_remove(i); + /// Attempts to reap every process in the queue, ignoring any errors and + /// enqueueing any orphans which have not yet exited. + pub(crate) fn reap_orphans(&self, handle: &SignalHandle) + where + T: Wait, + { + // If someone else is holding the lock, they will be responsible for draining + // the queue as necessary, so we can safely bail if that happens + if let Some(mut sigchild_guard) = self.sigchild.try_lock() { + match &mut *sigchild_guard { + Some(sigchild) => { + if sigchild.try_has_changed().and_then(Result::ok).is_some() { + drain_orphan_queue(self.queue.lock()); + } + } + None => { + let queue = self.queue.lock(); + + // Be lazy and only initialize the SIGCHLD listener if there + // are any orphaned processes in the queue. + if !queue.is_empty() { + // An errors shouldn't really happen here, but if it does it + // means that the signal driver isn't running, in + // which case there isn't anything we can + // register/initialize here, so we can try again later + if let Ok(sigchild) = signal_with_handle(SignalKind::child(), &handle) { + *sigchild_guard = Some(sigchild); + drain_orphan_queue(queue); + } + } } } } } } +fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) +where + T: Wait, +{ + for i in (0..queue.len()).rev() { + match queue[i].try_wait() { + Ok(None) => {} + Ok(Some(_)) | Err(_) => { + // The stdlib handles interruption errors (EINTR) when polling a child process. + // All other errors represent invalid inputs or pids that have already been + // reaped, so we can drop the orphan in case an error is raised. + queue.swap_remove(i); + } + } + } + + drop(queue); +} + #[cfg(all(test, not(loom)))] pub(crate) mod test { use super::*; + use crate::io::driver::Driver as IoDriver; + use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle}; + use crate::sync::watch; use std::cell::{Cell, RefCell}; use std::io; use std::os::unix::process::ExitStatusExt; @@ -100,14 +131,12 @@ pub(crate) mod test { pub(crate) struct MockQueue<W> { pub(crate) all_enqueued: RefCell<Vec<W>>, - pub(crate) total_reaps: Cell<usize>, } impl<W> MockQueue<W> { pub(crate) fn new() -> Self { Self { all_enqueued: RefCell::new(Vec::new()), - total_reaps: Cell::new(0), } } } @@ -118,12 +147,6 @@ pub(crate) mod test { } } - impl<W> ReapOrphanQueue for MockQueue<W> { - fn reap_orphans(&self) { - self.total_reaps.set(self.total_reaps.get() + 1); - } - } - struct MockWait { total_waits: Rc<Cell<usize>>, num_wait_until_status: usize, @@ -191,27 +214,107 @@ pub(crate) mod test { assert_eq!(orphanage.len(), 4); - orphanage.reap_orphans(); + drain_orphan_queue(orphanage.queue.lock()); assert_eq!(orphanage.len(), 2); assert_eq!(first_waits.get(), 1); assert_eq!(second_waits.get(), 1); assert_eq!(third_waits.get(), 1); assert_eq!(fourth_waits.get(), 1); - orphanage.reap_orphans(); + drain_orphan_queue(orphanage.queue.lock()); assert_eq!(orphanage.len(), 1); assert_eq!(first_waits.get(), 1); assert_eq!(second_waits.get(), 2); assert_eq!(third_waits.get(), 2); assert_eq!(fourth_waits.get(), 1); - orphanage.reap_orphans(); + drain_orphan_queue(orphanage.queue.lock()); assert_eq!(orphanage.len(), 0); assert_eq!(first_waits.get(), 1); assert_eq!(second_waits.get(), 2); assert_eq!(third_waits.get(), 3); assert_eq!(fourth_waits.get(), 1); - orphanage.reap_orphans(); // Safe to reap when empty + // Safe to reap when empty + drain_orphan_queue(orphanage.queue.lock()); + } + + #[test] + fn no_reap_if_no_signal_received() { + let (tx, rx) = watch::channel(()); + + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + *orphanage.sigchild.lock() = Some(rx); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + tx.send(()).unwrap(); + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 1); + } + + #[test] + fn no_reap_if_signal_lock_held() { + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + let signal_guard = orphanage.sigchild.lock(); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + drop(signal_guard); + } + + #[test] + fn does_not_register_signal_if_queue_empty() { + let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap(); + let handle = signal_driver.handle(); + + let orphanage = OrphanQueueImpl::new(); + assert!(orphanage.sigchild.lock().is_none()); // Sanity + + // No register when queue empty + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_none()); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_some()); + assert_eq!(waits.get(), 1); // Eager reap when registering listener + } + + #[test] + fn does_nothing_if_signal_could_not_be_registered() { + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + assert!(orphanage.sigchild.lock().is_none()); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + // Signal handler has "gone away", nothing to register or reap + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_none()); + assert_eq!(waits.get(), 0); } } |