diff options
Diffstat (limited to 'src/sys/unix/pipe.rs')
-rw-r--r-- | src/sys/unix/pipe.rs | 161 |
1 files changed, 149 insertions, 12 deletions
diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs index ccf5252..7a95b96 100644 --- a/src/sys/unix/pipe.rs +++ b/src/sys/unix/pipe.rs @@ -155,6 +155,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> { target_os = "netbsd", target_os = "openbsd", target_os = "illumos", + target_os = "redox", ))] unsafe { if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 { @@ -162,7 +163,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> { } } - #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))] + #[cfg(any(target_os = "ios", target_os = "macos"))] unsafe { // For platforms that don't have `pipe2(2)` we need to manually set the // correct flags on the file descriptor. @@ -192,8 +193,8 @@ pub fn new() -> io::Result<(Sender, Receiver)> { target_os = "openbsd", target_os = "ios", target_os = "macos", - target_os = "solaris", target_os = "illumos", + target_os = "redox", )))] compile_error!("unsupported target for `mio::unix::pipe`"); @@ -216,6 +217,74 @@ impl Sender { pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { set_nonblocking(self.inner.as_raw_fd(), nonblocking) } + + /// Execute an I/O operation ensuring that the socket receives more events + /// if it hits a [`WouldBlock`] error. + /// + /// # Notes + /// + /// This method is required to be called for **all** I/O operations to + /// ensure the user will receive events once the socket is ready again after + /// returning a [`WouldBlock`] error. + /// + /// [`WouldBlock`]: io::ErrorKind::WouldBlock + /// + /// # Examples + /// + /// ``` + /// # use std::error::Error; + /// # + /// # fn main() -> Result<(), Box<dyn Error>> { + /// use std::io; + /// use std::os::unix::io::AsRawFd; + /// use mio::unix::pipe; + /// + /// let (sender, receiver) = pipe::new()?; + /// + /// // Wait until the sender is writable... + /// + /// // Write to the sender using a direct libc call, of course the + /// // `io::Write` implementation would be easier to use. + /// let buf = b"hello"; + /// let n = sender.try_io(|| { + /// let buf_ptr = &buf as *const _ as *const _; + /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) }; + /// if res != -1 { + /// Ok(res as usize) + /// } else { + /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure + /// // should return `WouldBlock` error. + /// Err(io::Error::last_os_error()) + /// } + /// })?; + /// eprintln!("write {} bytes", n); + /// + /// // Wait until the receiver is readable... + /// + /// // Read from the receiver using a direct libc call, of course the + /// // `io::Read` implementation would be easier to use. + /// let mut buf = [0; 512]; + /// let n = receiver.try_io(|| { + /// let buf_ptr = &mut buf as *mut _ as *mut _; + /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) }; + /// if res != -1 { + /// Ok(res as usize) + /// } else { + /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure + /// // should return `WouldBlock` error. + /// Err(io::Error::last_os_error()) + /// } + /// })?; + /// eprintln!("read {} bytes", n); + /// # Ok(()) + /// # } + /// ``` + pub fn try_io<F, T>(&self, f: F) -> io::Result<T> + where + F: FnOnce() -> io::Result<T>, + { + self.inner.do_io(|_| f()) + } } impl event::Source for Sender { @@ -244,29 +313,29 @@ impl event::Source for Sender { impl Write for Sender { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - self.inner.do_io(|sender| (&*sender).write(buf)) + self.inner.do_io(|mut sender| sender.write(buf)) } fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { - self.inner.do_io(|sender| (&*sender).write_vectored(bufs)) + self.inner.do_io(|mut sender| sender.write_vectored(bufs)) } fn flush(&mut self) -> io::Result<()> { - self.inner.do_io(|sender| (&*sender).flush()) + self.inner.do_io(|mut sender| sender.flush()) } } impl Write for &Sender { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - self.inner.do_io(|sender| (&*sender).write(buf)) + self.inner.do_io(|mut sender| sender.write(buf)) } fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { - self.inner.do_io(|sender| (&*sender).write_vectored(bufs)) + self.inner.do_io(|mut sender| sender.write_vectored(bufs)) } fn flush(&mut self) -> io::Result<()> { - self.inner.do_io(|sender| (&*sender).flush()) + self.inner.do_io(|mut sender| sender.flush()) } } @@ -313,6 +382,74 @@ impl Receiver { pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { set_nonblocking(self.inner.as_raw_fd(), nonblocking) } + + /// Execute an I/O operation ensuring that the socket receives more events + /// if it hits a [`WouldBlock`] error. + /// + /// # Notes + /// + /// This method is required to be called for **all** I/O operations to + /// ensure the user will receive events once the socket is ready again after + /// returning a [`WouldBlock`] error. + /// + /// [`WouldBlock`]: io::ErrorKind::WouldBlock + /// + /// # Examples + /// + /// ``` + /// # use std::error::Error; + /// # + /// # fn main() -> Result<(), Box<dyn Error>> { + /// use std::io; + /// use std::os::unix::io::AsRawFd; + /// use mio::unix::pipe; + /// + /// let (sender, receiver) = pipe::new()?; + /// + /// // Wait until the sender is writable... + /// + /// // Write to the sender using a direct libc call, of course the + /// // `io::Write` implementation would be easier to use. + /// let buf = b"hello"; + /// let n = sender.try_io(|| { + /// let buf_ptr = &buf as *const _ as *const _; + /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) }; + /// if res != -1 { + /// Ok(res as usize) + /// } else { + /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure + /// // should return `WouldBlock` error. + /// Err(io::Error::last_os_error()) + /// } + /// })?; + /// eprintln!("write {} bytes", n); + /// + /// // Wait until the receiver is readable... + /// + /// // Read from the receiver using a direct libc call, of course the + /// // `io::Read` implementation would be easier to use. + /// let mut buf = [0; 512]; + /// let n = receiver.try_io(|| { + /// let buf_ptr = &mut buf as *mut _ as *mut _; + /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) }; + /// if res != -1 { + /// Ok(res as usize) + /// } else { + /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure + /// // should return `WouldBlock` error. + /// Err(io::Error::last_os_error()) + /// } + /// })?; + /// eprintln!("read {} bytes", n); + /// # Ok(()) + /// # } + /// ``` + pub fn try_io<F, T>(&self, f: F) -> io::Result<T> + where + F: FnOnce() -> io::Result<T>, + { + self.inner.do_io(|_| f()) + } } impl event::Source for Receiver { @@ -341,21 +478,21 @@ impl event::Source for Receiver { impl Read for Receiver { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.inner.do_io(|sender| (&*sender).read(buf)) + self.inner.do_io(|mut sender| sender.read(buf)) } fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { - self.inner.do_io(|sender| (&*sender).read_vectored(bufs)) + self.inner.do_io(|mut sender| sender.read_vectored(bufs)) } } impl Read for &Receiver { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.inner.do_io(|sender| (&*sender).read(buf)) + self.inner.do_io(|mut sender| sender.read(buf)) } fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { - self.inner.do_io(|sender| (&*sender).read_vectored(bufs)) + self.inner.do_io(|mut sender| sender.read_vectored(bufs)) } } |