aboutsummaryrefslogtreecommitdiff
path: root/src/process/unix/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/process/unix/mod.rs')
-rw-r--r--src/process/unix/mod.rs104
1 files changed, 88 insertions, 16 deletions
diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs
index 576fe6c..78c792c 100644
--- a/src/process/unix/mod.rs
+++ b/src/process/unix/mod.rs
@@ -21,23 +21,20 @@
//! processes in general aren't scalable (e.g. millions) so it shouldn't be that
//! bad in theory...
-pub(crate) mod driver;
-
pub(crate) mod orphan;
use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
mod reap;
use reap::Reaper;
-use crate::io::PollEvented;
+use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
use crate::process::kill::Kill;
use crate::process::SpawnedChild;
-use crate::signal::unix::driver::Handle as SignalHandle;
+use crate::runtime::signal::Handle as SignalHandle;
use crate::signal::unix::{signal, Signal, SignalKind};
use mio::event::Source;
use mio::unix::SourceFd;
-use once_cell::sync::Lazy;
use std::fmt;
use std::fs::File;
use std::future::Future;
@@ -64,25 +61,41 @@ impl Kill for StdChild {
}
}
-static ORPHAN_QUEUE: Lazy<OrphanQueueImpl<StdChild>> = Lazy::new(OrphanQueueImpl::new);
+cfg_not_has_const_mutex_new! {
+ fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> {
+ use crate::util::once_cell::OnceCell;
+
+ static ORPHAN_QUEUE: OnceCell<OrphanQueueImpl<StdChild>> = OnceCell::new();
+
+ ORPHAN_QUEUE.get(OrphanQueueImpl::new)
+ }
+}
+
+cfg_has_const_mutex_new! {
+ fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> {
+ static ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new();
+
+ &ORPHAN_QUEUE
+ }
+}
pub(crate) struct GlobalOrphanQueue;
impl fmt::Debug for GlobalOrphanQueue {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- ORPHAN_QUEUE.fmt(fmt)
+ get_orphan_queue().fmt(fmt)
}
}
impl GlobalOrphanQueue {
- fn reap_orphans(handle: &SignalHandle) {
- ORPHAN_QUEUE.reap_orphans(handle)
+ pub(crate) fn reap_orphans(handle: &SignalHandle) {
+ get_orphan_queue().reap_orphans(handle)
}
}
impl OrphanQueue<StdChild> for GlobalOrphanQueue {
fn push_orphan(&self, orphan: StdChild) {
- ORPHAN_QUEUE.push_orphan(orphan)
+ get_orphan_queue().push_orphan(orphan)
}
}
@@ -143,7 +156,7 @@ impl Future for Child {
#[derive(Debug)]
pub(crate) struct Pipe {
- // Actually a pipe and not a File. However, we are reusing `File` to get
+ // Actually a pipe is not a File. However, we are reusing `File` to get
// close on drop. This is a similar trick as `mio`.
fd: File,
}
@@ -169,6 +182,10 @@ impl<'a> io::Write for &'a Pipe {
fn flush(&mut self) -> io::Result<()> {
(&self.fd).flush()
}
+
+ fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ (&self.fd).write_vectored(bufs)
+ }
}
impl AsRawFd for Pipe {
@@ -177,8 +194,8 @@ impl AsRawFd for Pipe {
}
}
-pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> {
- let mut fd = io.into_inner()?.fd;
+pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> {
+ let mut fd = io.inner.into_inner()?.fd;
// Ensure that the fd to be inherited is set to *blocking* mode, as this
// is the default that virtually all programs expect to have. Those
@@ -213,7 +230,62 @@ impl Source for Pipe {
}
}
-pub(crate) type ChildStdio = PollEvented<Pipe>;
+pub(crate) struct ChildStdio {
+ inner: PollEvented<Pipe>,
+}
+
+impl fmt::Debug for ChildStdio {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.inner.fmt(fmt)
+ }
+}
+
+impl AsRawFd for ChildStdio {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_raw_fd()
+ }
+}
+
+impl AsyncWrite for ChildStdio {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner.poll_write(cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll<Result<usize, io::Error>> {
+ self.inner.poll_write_vectored(cx, bufs)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ true
+ }
+}
+
+impl AsyncRead for ChildStdio {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ // Safety: pipes support reading into uninitialized memory
+ unsafe { self.inner.poll_read(cx, buf) }
+ }
+}
fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
unsafe {
@@ -238,7 +310,7 @@ fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()>
Ok(())
}
-pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>>
+pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
where
T: IntoRawFd,
{
@@ -246,5 +318,5 @@ where
let mut pipe = Pipe::from(io);
set_nonblocking(&mut pipe, true)?;
- PollEvented::new(pipe)
+ PollEvented::new(pipe).map(|inner| ChildStdio { inner })
}