diff options
Diffstat (limited to 'src/process/unix/mod.rs')
-rw-r--r-- | src/process/unix/mod.rs | 104 |
1 files changed, 88 insertions, 16 deletions
diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs index 576fe6c..78c792c 100644 --- a/src/process/unix/mod.rs +++ b/src/process/unix/mod.rs @@ -21,23 +21,20 @@ //! processes in general aren't scalable (e.g. millions) so it shouldn't be that //! bad in theory... -pub(crate) mod driver; - pub(crate) mod orphan; use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; mod reap; use reap::Reaper; -use crate::io::PollEvented; +use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; use crate::process::kill::Kill; use crate::process::SpawnedChild; -use crate::signal::unix::driver::Handle as SignalHandle; +use crate::runtime::signal::Handle as SignalHandle; use crate::signal::unix::{signal, Signal, SignalKind}; use mio::event::Source; use mio::unix::SourceFd; -use once_cell::sync::Lazy; use std::fmt; use std::fs::File; use std::future::Future; @@ -64,25 +61,41 @@ impl Kill for StdChild { } } -static ORPHAN_QUEUE: Lazy<OrphanQueueImpl<StdChild>> = Lazy::new(OrphanQueueImpl::new); +cfg_not_has_const_mutex_new! { + fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> { + use crate::util::once_cell::OnceCell; + + static ORPHAN_QUEUE: OnceCell<OrphanQueueImpl<StdChild>> = OnceCell::new(); + + ORPHAN_QUEUE.get(OrphanQueueImpl::new) + } +} + +cfg_has_const_mutex_new! { + fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> { + static ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new(); + + &ORPHAN_QUEUE + } +} pub(crate) struct GlobalOrphanQueue; impl fmt::Debug for GlobalOrphanQueue { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - ORPHAN_QUEUE.fmt(fmt) + get_orphan_queue().fmt(fmt) } } impl GlobalOrphanQueue { - fn reap_orphans(handle: &SignalHandle) { - ORPHAN_QUEUE.reap_orphans(handle) + pub(crate) fn reap_orphans(handle: &SignalHandle) { + get_orphan_queue().reap_orphans(handle) } } impl OrphanQueue<StdChild> for GlobalOrphanQueue { fn push_orphan(&self, orphan: StdChild) { - ORPHAN_QUEUE.push_orphan(orphan) + get_orphan_queue().push_orphan(orphan) } } @@ -143,7 +156,7 @@ impl Future for Child { #[derive(Debug)] pub(crate) struct Pipe { - // Actually a pipe and not a File. However, we are reusing `File` to get + // Actually a pipe is not a File. However, we are reusing `File` to get // close on drop. This is a similar trick as `mio`. fd: File, } @@ -169,6 +182,10 @@ impl<'a> io::Write for &'a Pipe { fn flush(&mut self) -> io::Result<()> { (&self.fd).flush() } + + fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> { + (&self.fd).write_vectored(bufs) + } } impl AsRawFd for Pipe { @@ -177,8 +194,8 @@ impl AsRawFd for Pipe { } } -pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> { - let mut fd = io.into_inner()?.fd; +pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> { + let mut fd = io.inner.into_inner()?.fd; // Ensure that the fd to be inherited is set to *blocking* mode, as this // is the default that virtually all programs expect to have. Those @@ -213,7 +230,62 @@ impl Source for Pipe { } } -pub(crate) type ChildStdio = PollEvented<Pipe>; +pub(crate) struct ChildStdio { + inner: PollEvented<Pipe>, +} + +impl fmt::Debug for ChildStdio { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt(fmt) + } +} + +impl AsRawFd for ChildStdio { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl AsyncWrite for ChildStdio { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.inner.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll<Result<usize, io::Error>> { + self.inner.poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + true + } +} + +impl AsyncRead for ChildStdio { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + // Safety: pipes support reading into uninitialized memory + unsafe { self.inner.poll_read(cx, buf) } + } +} fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> { unsafe { @@ -238,7 +310,7 @@ fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> Ok(()) } -pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>> +pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio> where T: IntoRawFd, { @@ -246,5 +318,5 @@ where let mut pipe = Pipe::from(io); set_nonblocking(&mut pipe, true)?; - PollEvented::new(pipe) + PollEvented::new(pipe).map(|inner| ChildStdio { inner }) } |