diff options
Diffstat (limited to 'src/net/uds/stream.rs')
-rw-r--r-- | src/net/uds/stream.rs | 91 |
1 files changed, 81 insertions, 10 deletions
diff --git a/src/net/uds/stream.rs b/src/net/uds/stream.rs index f21d9e7..b38812e 100644 --- a/src/net/uds/stream.rs +++ b/src/net/uds/stream.rs @@ -15,6 +15,9 @@ pub struct UnixStream { impl UnixStream { /// Connects to the socket named by `path`. + /// + /// This may return a `WouldBlock` in which case the socket connection + /// cannot be completed immediately. Usually it means the backlog is full. pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> { sys::uds::stream::connect(path.as_ref()).map(UnixStream::from_std) } @@ -69,53 +72,121 @@ impl UnixStream { pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { self.inner.shutdown(how) } + + /// Execute an I/O operation ensuring that the socket receives more events + /// if it hits a [`WouldBlock`] error. + /// + /// # Notes + /// + /// This method is required to be called for **all** I/O operations to + /// ensure the user will receive events once the socket is ready again after + /// returning a [`WouldBlock`] error. + /// + /// [`WouldBlock`]: io::ErrorKind::WouldBlock + /// + /// # Examples + /// + /// ``` + /// # use std::error::Error; + /// # + /// # fn main() -> Result<(), Box<dyn Error>> { + /// use std::io; + /// use std::os::unix::io::AsRawFd; + /// use mio::net::UnixStream; + /// + /// let (stream1, stream2) = UnixStream::pair()?; + /// + /// // Wait until the stream is writable... + /// + /// // Write to the stream using a direct libc call, of course the + /// // `io::Write` implementation would be easier to use. + /// let buf = b"hello"; + /// let n = stream1.try_io(|| { + /// let buf_ptr = &buf as *const _ as *const _; + /// let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) }; + /// if res != -1 { + /// Ok(res as usize) + /// } else { + /// // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure + /// // should return `WouldBlock` error. + /// Err(io::Error::last_os_error()) + /// } + /// })?; + /// eprintln!("write {} bytes", n); + /// + /// // Wait until the stream is readable... + /// + /// // Read from the stream using a direct libc call, of course the + /// // `io::Read` implementation would be easier to use. + /// let mut buf = [0; 512]; + /// let n = stream2.try_io(|| { + /// let buf_ptr = &mut buf as *mut _ as *mut _; + /// let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) }; + /// if res != -1 { + /// Ok(res as usize) + /// } else { + /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure + /// // should return `WouldBlock` error. + /// Err(io::Error::last_os_error()) + /// } + /// })?; + /// eprintln!("read {} bytes", n); + /// # Ok(()) + /// # } + /// ``` + pub fn try_io<F, T>(&self, f: F) -> io::Result<T> + where + F: FnOnce() -> io::Result<T>, + { + self.inner.do_io(|_| f()) + } } impl Read for UnixStream { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.inner.do_io(|inner| (&*inner).read(buf)) + self.inner.do_io(|mut inner| inner.read(buf)) } fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { - self.inner.do_io(|inner| (&*inner).read_vectored(bufs)) + self.inner.do_io(|mut inner| inner.read_vectored(bufs)) } } impl<'a> Read for &'a UnixStream { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.inner.do_io(|inner| (&*inner).read(buf)) + self.inner.do_io(|mut inner| inner.read(buf)) } fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { - self.inner.do_io(|inner| (&*inner).read_vectored(bufs)) + self.inner.do_io(|mut inner| inner.read_vectored(bufs)) } } impl Write for UnixStream { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - self.inner.do_io(|inner| (&*inner).write(buf)) + self.inner.do_io(|mut inner| inner.write(buf)) } fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { - self.inner.do_io(|inner| (&*inner).write_vectored(bufs)) + self.inner.do_io(|mut inner| inner.write_vectored(bufs)) } fn flush(&mut self) -> io::Result<()> { - self.inner.do_io(|inner| (&*inner).flush()) + self.inner.do_io(|mut inner| inner.flush()) } } impl<'a> Write for &'a UnixStream { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - self.inner.do_io(|inner| (&*inner).write(buf)) + self.inner.do_io(|mut inner| inner.write(buf)) } fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { - self.inner.do_io(|inner| (&*inner).write_vectored(bufs)) + self.inner.do_io(|mut inner| inner.write_vectored(bufs)) } fn flush(&mut self) -> io::Result<()> { - self.inner.do_io(|inner| (&*inner).flush()) + self.inner.do_io(|mut inner| inner.flush()) } } |