aboutsummaryrefslogtreecommitdiff
path: root/src/net/uds/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/uds/stream.rs')
-rw-r--r--src/net/uds/stream.rs91
1 files changed, 81 insertions, 10 deletions
diff --git a/src/net/uds/stream.rs b/src/net/uds/stream.rs
index f21d9e7..b38812e 100644
--- a/src/net/uds/stream.rs
+++ b/src/net/uds/stream.rs
@@ -15,6 +15,9 @@ pub struct UnixStream {
impl UnixStream {
/// Connects to the socket named by `path`.
+ ///
+ /// This may return a `WouldBlock` in which case the socket connection
+ /// cannot be completed immediately. Usually it means the backlog is full.
pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
sys::uds::stream::connect(path.as_ref()).map(UnixStream::from_std)
}
@@ -69,53 +72,121 @@ impl UnixStream {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.inner.shutdown(how)
}
+
+ /// Execute an I/O operation ensuring that the socket receives more events
+ /// if it hits a [`WouldBlock`] error.
+ ///
+ /// # Notes
+ ///
+ /// This method is required to be called for **all** I/O operations to
+ /// ensure the user will receive events once the socket is ready again after
+ /// returning a [`WouldBlock`] error.
+ ///
+ /// [`WouldBlock`]: io::ErrorKind::WouldBlock
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use std::error::Error;
+ /// #
+ /// # fn main() -> Result<(), Box<dyn Error>> {
+ /// use std::io;
+ /// use std::os::unix::io::AsRawFd;
+ /// use mio::net::UnixStream;
+ ///
+ /// let (stream1, stream2) = UnixStream::pair()?;
+ ///
+ /// // Wait until the stream is writable...
+ ///
+ /// // Write to the stream using a direct libc call, of course the
+ /// // `io::Write` implementation would be easier to use.
+ /// let buf = b"hello";
+ /// let n = stream1.try_io(|| {
+ /// let buf_ptr = &buf as *const _ as *const _;
+ /// let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("write {} bytes", n);
+ ///
+ /// // Wait until the stream is readable...
+ ///
+ /// // Read from the stream using a direct libc call, of course the
+ /// // `io::Read` implementation would be easier to use.
+ /// let mut buf = [0; 512];
+ /// let n = stream2.try_io(|| {
+ /// let buf_ptr = &mut buf as *mut _ as *mut _;
+ /// let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) };
+ /// if res != -1 {
+ /// Ok(res as usize)
+ /// } else {
+ /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
+ /// // should return `WouldBlock` error.
+ /// Err(io::Error::last_os_error())
+ /// }
+ /// })?;
+ /// eprintln!("read {} bytes", n);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
+ where
+ F: FnOnce() -> io::Result<T>,
+ {
+ self.inner.do_io(|_| f())
+ }
}
impl Read for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read(buf))
+ self.inner.do_io(|mut inner| inner.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.read_vectored(bufs))
}
}
impl<'a> Read for &'a UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read(buf))
+ self.inner.do_io(|mut inner| inner.read(buf))
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).read_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.read_vectored(bufs))
}
}
impl Write for UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write(buf))
+ self.inner.do_io(|mut inner| inner.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|inner| (&*inner).flush())
+ self.inner.do_io(|mut inner| inner.flush())
}
}
impl<'a> Write for &'a UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write(buf))
+ self.inner.do_io(|mut inner| inner.write(buf))
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- self.inner.do_io(|inner| (&*inner).write_vectored(bufs))
+ self.inner.do_io(|mut inner| inner.write_vectored(bufs))
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.do_io(|inner| (&*inner).flush())
+ self.inner.do_io(|mut inner| inner.flush())
}
}