aboutsummaryrefslogtreecommitdiff
path: root/src/sys/unix/pipe.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sys/unix/pipe.rs')
-rw-r--r--src/sys/unix/pipe.rs161
1 files changed, 149 insertions, 12 deletions
diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs
index ccf5252..7a95b96 100644
--- a/src/sys/unix/pipe.rs
+++ b/src/sys/unix/pipe.rs
@@ -155,6 +155,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
target_os = "netbsd",
target_os = "openbsd",
target_os = "illumos",
+ target_os = "redox",
))]
unsafe {
if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
@@ -162,7 +163,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
}
}
- #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
+ #[cfg(any(target_os = "ios", target_os = "macos"))]
unsafe {
// For platforms that don't have `pipe2(2)` we need to manually set the
// correct flags on the file descriptor.
@@ -192,8 +193,8 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
target_os = "openbsd",
target_os = "ios",
target_os = "macos",
- target_os = "solaris",
target_os = "illumos",
+ target_os = "redox",
)))]
compile_error!("unsupported target for `mio::unix::pipe`");
@@ -216,6 +217,74 @@ impl Sender {
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
set_nonblocking(self.inner.as_raw_fd(), nonblocking)
}
+
+ /// Execute an I/O operation ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use std::io;
+ /// use std::os::unix::io::AsRawFd;
+ /// use mio::unix::pipe;
+ ///
+ /// let (sender, receiver) = pipe::new()?;
+ ///
+ /// // Wait until the sender is writable...
+ ///
+ /// // Write to the sender using a direct libc call, of course the
+ /// // `io::Write` implementation would be easier to use.
+ /// let buf = b"hello";
+ /// let n = sender.try_io(|| {
+ /// let buf_ptr = &buf as *const _ as *const _;
+ /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("write {} bytes", n);
+ ///
+ /// // Wait until the receiver is readable...
+ ///
+ /// // Read from the receiver using a direct libc call, of course the
+ /// // `io::Read` implementation would be easier to use.
+ /// let mut buf = [0; 512];
+ /// let n = receiver.try_io(|| {
+ /// let buf_ptr = &mut buf as *mut _ as *mut _;
+ /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("read {} bytes", n);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
+ where
+ F: FnOnce() -> io::Result<T>,
+ {
+ self.inner.do_io(|_| f())
+ }
}
impl event::Source for Sender {
@@ -244,29 +313,29 @@ impl event::Source for Sender {
impl Write for Sender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write(buf))
+ self.inner.do_io(|mut sender| sender.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|sender| (&*sender).flush())
+ self.inner.do_io(|mut sender| sender.flush())
}
}
impl Write for &Sender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write(buf))
+ self.inner.do_io(|mut sender| sender.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).write_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|sender| (&*sender).flush())
+ self.inner.do_io(|mut sender| sender.flush())
}
}
@@ -313,6 +382,74 @@ impl Receiver {
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
set_nonblocking(self.inner.as_raw_fd(), nonblocking)
}
+
+ /// Execute an I/O operation ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use std::io;
+ /// use std::os::unix::io::AsRawFd;
+ /// use mio::unix::pipe;
+ ///
+ /// let (sender, receiver) = pipe::new()?;
+ ///
+ /// // Wait until the sender is writable...
+ ///
+ /// // Write to the sender using a direct libc call, of course the
+ /// // `io::Write` implementation would be easier to use.
+ /// let buf = b"hello";
+ /// let n = sender.try_io(|| {
+ /// let buf_ptr = &buf as *const _ as *const _;
+ /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("write {} bytes", n);
+ ///
+ /// // Wait until the receiver is readable...
+ ///
+ /// // Read from the receiver using a direct libc call, of course the
+ /// // `io::Read` implementation would be easier to use.
+ /// let mut buf = [0; 512];
+ /// let n = receiver.try_io(|| {
+ /// let buf_ptr = &mut buf as *mut _ as *mut _;
+ /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("read {} bytes", n);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
+ where
+ F: FnOnce() -> io::Result<T>,
+ {
+ self.inner.do_io(|_| f())
+ }
}
impl event::Source for Receiver {
@@ -341,21 +478,21 @@ impl event::Source for Receiver {
impl Read for Receiver {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read(buf))
+ self.inner.do_io(|mut sender| sender.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.read_vectored(bufs))
}
}
impl Read for &Receiver {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read(buf))
+ self.inner.do_io(|mut sender| sender.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|sender| (&*sender).read_vectored(bufs))
+ self.inner.do_io(|mut sender| sender.read_vectored(bufs))
}
}