diff options
Diffstat (limited to 'src/net/unix')
-rw-r--r-- | src/net/unix/datagram.rs | 242 | ||||
-rw-r--r-- | src/net/unix/datagram/mod.rs | 3 | ||||
-rw-r--r-- | src/net/unix/datagram/socket.rs | 731 | ||||
-rw-r--r-- | src/net/unix/incoming.rs | 42 | ||||
-rw-r--r-- | src/net/unix/listener.rs | 143 | ||||
-rw-r--r-- | src/net/unix/mod.rs | 12 | ||||
-rw-r--r-- | src/net/unix/socketaddr.rs | 31 | ||||
-rw-r--r-- | src/net/unix/split.rs | 36 | ||||
-rw-r--r-- | src/net/unix/split_owned.rs | 182 | ||||
-rw-r--r-- | src/net/unix/stream.rs | 111 | ||||
-rw-r--r-- | src/net/unix/ucred.rs | 16 |
11 files changed, 1103 insertions, 446 deletions
diff --git a/src/net/unix/datagram.rs b/src/net/unix/datagram.rs deleted file mode 100644 index ff0f424..0000000 --- a/src/net/unix/datagram.rs +++ /dev/null @@ -1,242 +0,0 @@ -use crate::future::poll_fn; -use crate::io::PollEvented; - -use std::convert::TryFrom; -use std::fmt; -use std::io; -use std::net::Shutdown; -use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; -use std::path::Path; -use std::task::{Context, Poll}; - -cfg_uds! { - /// An I/O object representing a Unix datagram socket. - pub struct UnixDatagram { - io: PollEvented<mio_uds::UnixDatagram>, - } -} - -impl UnixDatagram { - /// Creates a new `UnixDatagram` bound to the specified path. - pub fn bind<P>(path: P) -> io::Result<UnixDatagram> - where - P: AsRef<Path>, - { - let socket = mio_uds::UnixDatagram::bind(path)?; - UnixDatagram::new(socket) - } - - /// Creates an unnamed pair of connected sockets. - /// - /// This function will create a pair of interconnected Unix sockets for - /// communicating back and forth between one another. Each socket will - /// be associated with the default event loop's handle. - pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { - let (a, b) = mio_uds::UnixDatagram::pair()?; - let a = UnixDatagram::new(a)?; - let b = UnixDatagram::new(b)?; - - Ok((a, b)) - } - - /// Consumes a `UnixDatagram` in the standard library and returns a - /// nonblocking `UnixDatagram` from this crate. - /// - /// The returned datagram will be associated with the given event loop - /// specified by `handle` and is ready to perform I/O. - /// - /// # Panics - /// - /// This function panics if thread-local runtime is not set. - /// - /// The runtime is usually set implicitly when this function is called - /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. - pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> { - let socket = mio_uds::UnixDatagram::from_datagram(datagram)?; - let io = PollEvented::new(socket)?; - Ok(UnixDatagram { io }) - } - - fn new(socket: mio_uds::UnixDatagram) -> io::Result<UnixDatagram> { - let io = PollEvented::new(socket)?; - Ok(UnixDatagram { io }) - } - - /// Creates a new `UnixDatagram` which is not bound to any address. - pub fn unbound() -> io::Result<UnixDatagram> { - let socket = mio_uds::UnixDatagram::unbound()?; - UnixDatagram::new(socket) - } - - /// Connects the socket to the specified address. - /// - /// The `send` method may be used to send data to the specified address. - /// `recv` and `recv_from` will only receive data from that address. - pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> { - self.io.get_ref().connect(path) - } - - /// Sends data on the socket to the socket's peer. - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_send_priv(cx, buf)).await - } - - // Poll IO functions that takes `&self` are provided for the split API. - // - // They are not public because (taken from the doc of `PollEvented`): - // - // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the - // caller must ensure that there are at most two tasks that use a - // `PollEvented` instance concurrently. One for reading and one for writing. - // While violating this requirement is "safe" from a Rust memory model point - // of view, it will result in unexpected behavior in the form of lost - // notifications and tasks hanging. - pub(crate) fn poll_send_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - - /// Receives data from the socket. - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_recv_priv(cx, buf)).await - } - - pub(crate) fn poll_recv_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - - /// Sends data on the socket to the specified address. - pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> - where - P: AsRef<Path> + Unpin, - { - poll_fn(|cx| self.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - pub(crate) fn poll_send_to_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: &Path, - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - - /// Receives data from the socket. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await - } - - pub(crate) fn poll_recv_from_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<Result<(usize, SocketAddr), io::Error>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv_from(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - - /// Returns the local address that this socket is bound to. - pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr() - } - - /// Returns the address of this socket's peer. - /// - /// The `connect` method will connect the socket to a peer. - pub fn peer_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().peer_addr() - } - - /// Returns the value of the `SO_ERROR` option. - pub fn take_error(&self) -> io::Result<Option<io::Error>> { - self.io.get_ref().take_error() - } - - /// Shuts down the read, write, or both halves of this connection. - /// - /// This function will cause all pending and future I/O calls on the - /// specified portions to immediately return with an appropriate value - /// (see the documentation of `Shutdown`). - pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { - self.io.get_ref().shutdown(how) - } -} - -impl TryFrom<UnixDatagram> for mio_uds::UnixDatagram { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner - fn try_from(value: UnixDatagram) -> Result<Self, Self::Error> { - value.io.into_inner() - } -} - -impl TryFrom<net::UnixDatagram> for UnixDatagram { - type Error = io::Error; - - /// Consumes stream, returning the tokio I/O object. - /// - /// This is equivalent to - /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std). - fn try_from(stream: net::UnixDatagram) -> Result<Self, Self::Error> { - Self::from_std(stream) - } -} - -impl fmt::Debug for UnixDatagram { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) - } -} - -impl AsRawFd for UnixDatagram { - fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() - } -} diff --git a/src/net/unix/datagram/mod.rs b/src/net/unix/datagram/mod.rs new file mode 100644 index 0000000..6268b4a --- /dev/null +++ b/src/net/unix/datagram/mod.rs @@ -0,0 +1,3 @@ +//! Unix datagram types. + +pub(crate) mod socket; diff --git a/src/net/unix/datagram/socket.rs b/src/net/unix/datagram/socket.rs new file mode 100644 index 0000000..3ae66d1 --- /dev/null +++ b/src/net/unix/datagram/socket.rs @@ -0,0 +1,731 @@ +use crate::io::PollEvented; +use crate::net::unix::SocketAddr; + +use std::convert::TryFrom; +use std::fmt; +use std::io; +use std::net::Shutdown; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::net; +use std::path::Path; + +cfg_net_unix! { + /// An I/O object representing a Unix datagram socket. + /// + /// A socket can be either named (associated with a filesystem path) or + /// unnamed. + /// + /// **Note:** named sockets are persisted even after the object is dropped + /// and the program has exited, and cannot be reconnected. It is advised + /// that you either check for and unlink the existing socket if it exists, + /// or use a temporary file that is guaranteed to not already exist. + /// + /// # Examples + /// Using named sockets, associated with a filesystem path: + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// use tempfile::tempdir; + /// + /// // We use a temporary directory so that the socket + /// // files left by the bound sockets will get cleaned up. + /// let tmp = tempdir()?; + /// + /// // Bind each socket to a filesystem path + /// let tx_path = tmp.path().join("tx"); + /// let tx = UnixDatagram::bind(&tx_path)?; + /// let rx_path = tmp.path().join("rx"); + /// let rx = UnixDatagram::bind(&rx_path)?; + /// + /// let bytes = b"hello world"; + /// tx.send_to(bytes, &rx_path).await?; + /// + /// let mut buf = vec![0u8; 24]; + /// let (size, addr) = rx.recv_from(&mut buf).await?; + /// + /// let dgram = &buf[..size]; + /// assert_eq!(dgram, bytes); + /// assert_eq!(addr.as_pathname().unwrap(), &tx_path); + /// + /// # Ok(()) + /// # } + /// ``` + /// + /// Using unnamed sockets, created as a pair + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// + /// // Create the pair of sockets + /// let (sock1, sock2) = UnixDatagram::pair()?; + /// + /// // Since the sockets are paired, the paired send/recv + /// // functions can be used + /// let bytes = b"hello world"; + /// sock1.send(bytes).await?; + /// + /// let mut buff = vec![0u8; 24]; + /// let size = sock2.recv(&mut buff).await?; + /// + /// let dgram = &buff[..size]; + /// assert_eq!(dgram, bytes); + /// + /// # Ok(()) + /// # } + /// ``` + pub struct UnixDatagram { + io: PollEvented<mio::net::UnixDatagram>, + } +} + +impl UnixDatagram { + /// Creates a new `UnixDatagram` bound to the specified path. + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// use tempfile::tempdir; + /// + /// // We use a temporary directory so that the socket + /// // files left by the bound sockets will get cleaned up. + /// let tmp = tempdir()?; + /// + /// // Bind the socket to a filesystem path + /// let socket_path = tmp.path().join("socket"); + /// let socket = UnixDatagram::bind(&socket_path)?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn bind<P>(path: P) -> io::Result<UnixDatagram> + where + P: AsRef<Path>, + { + let socket = mio::net::UnixDatagram::bind(path)?; + UnixDatagram::new(socket) + } + + /// Creates an unnamed pair of connected sockets. + /// + /// This function will create a pair of interconnected Unix sockets for + /// communicating back and forth between one another. + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// + /// // Create the pair of sockets + /// let (sock1, sock2) = UnixDatagram::pair()?; + /// + /// // Since the sockets are paired, the paired send/recv + /// // functions can be used + /// let bytes = b"hail eris"; + /// sock1.send(bytes).await?; + /// + /// let mut buff = vec![0u8; 24]; + /// let size = sock2.recv(&mut buff).await?; + /// + /// let dgram = &buff[..size]; + /// assert_eq!(dgram, bytes); + /// + /// # Ok(()) + /// # } + /// ``` + pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { + let (a, b) = mio::net::UnixDatagram::pair()?; + let a = UnixDatagram::new(a)?; + let b = UnixDatagram::new(b)?; + + Ok((a, b)) + } + + /// Consumes a `UnixDatagram` in the standard library and returns a + /// nonblocking `UnixDatagram` from this crate. + /// + /// The returned datagram will be associated with the given event loop + /// specified by `handle` and is ready to perform I/O. + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a Tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// use std::os::unix::net::UnixDatagram as StdUDS; + /// use tempfile::tempdir; + /// + /// // We use a temporary directory so that the socket + /// // files left by the bound sockets will get cleaned up. + /// let tmp = tempdir()?; + /// + /// // Bind the socket to a filesystem path + /// let socket_path = tmp.path().join("socket"); + /// let std_socket = StdUDS::bind(&socket_path)?; + /// let tokio_socket = UnixDatagram::from_std(std_socket)?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> { + let socket = mio::net::UnixDatagram::from_std(datagram); + let io = PollEvented::new(socket)?; + Ok(UnixDatagram { io }) + } + + fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> { + let io = PollEvented::new(socket)?; + Ok(UnixDatagram { io }) + } + + /// Creates a new `UnixDatagram` which is not bound to any address. + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// use tempfile::tempdir; + /// + /// // Create an unbound socket + /// let tx = UnixDatagram::unbound()?; + /// + /// // Create another, bound socket + /// let tmp = tempdir()?; + /// let rx_path = tmp.path().join("rx"); + /// let rx = UnixDatagram::bind(&rx_path)?; + /// + /// // Send to the bound socket + /// let bytes = b"hello world"; + /// tx.send_to(bytes, &rx_path).await?; + /// + /// let mut buf = vec![0u8; 24]; + /// let (size, addr) = rx.recv_from(&mut buf).await?; + /// + /// let dgram = &buf[..size]; + /// assert_eq!(dgram, bytes); + /// + /// # Ok(()) + /// # } + /// ``` + pub fn unbound() -> io::Result<UnixDatagram> { + let socket = mio::net::UnixDatagram::unbound()?; + UnixDatagram::new(socket) + } + + /// Connects the socket to the specified address. + /// + /// The `send` method may be used to send data to the specified address. + /// `recv` and `recv_from` will only receive data from that address. + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// use tempfile::tempdir; + /// + /// // Create an unbound socket + /// let tx = UnixDatagram::unbound()?; + /// + /// // Create another, bound socket + /// let tmp = tempdir()?; + /// let rx_path = tmp.path().join("rx"); + /// let rx = UnixDatagram::bind(&rx_path)?; + /// + /// // Connect to the bound socket + /// tx.connect(&rx_path)?; + /// + /// // Send to the bound socket + /// let bytes = b"hello world"; + /// tx.send(bytes).await?; + /// + /// let mut buf = vec![0u8; 24]; + /// let (size, addr) = rx.recv_from(&mut buf).await?; + /// + /// let dgram = &buf[..size]; + /// assert_eq!(dgram, bytes); + /// + /// # Ok(()) + /// # } + /// ``` + pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> { + self.io.get_ref().connect(path) + } + + /// Sends data on the socket to the socket's peer. + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// + /// // Create the pair of sockets + /// let (sock1, sock2) = UnixDatagram::pair()?; + /// + /// // Since the sockets are paired, the paired send/recv + /// // functions can be used + /// let bytes = b"hello world"; + /// sock1.send(bytes).await?; + /// + /// let mut buff = vec![0u8; 24]; + /// let size = sock2.recv(&mut buff).await?; + /// + /// let dgram = &buff[..size]; + /// assert_eq!(dgram, bytes); + /// + /// # Ok(()) + /// # } + /// ``` + pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { + self.io + .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf)) + .await + } + + /// Try to send a datagram to the peer without waiting. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// use tokio::net::UnixDatagram; + /// + /// let bytes = b"bytes"; + /// // We use a socket pair so that they are assigned + /// // each other as a peer. + /// let (first, second) = UnixDatagram::pair()?; + /// + /// let size = first.try_send(bytes)?; + /// assert_eq!(size, bytes.len()); + /// + /// let mut buffer = vec![0u8; 24]; + /// let size = second.try_recv(&mut buffer)?; + /// + /// let dgram = &buffer[..size]; + /// assert_eq!(dgram, bytes); + /// # Ok(()) + /// # } + /// ``` + pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> { + self.io.get_ref().send(buf) + } + + /// Try to send a datagram to the peer without waiting. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// use tokio::net::UnixDatagram; + /// use tempfile::tempdir; + /// + /// let bytes = b"bytes"; + /// // We use a temporary directory so that the socket + /// // files left by the bound sockets will get cleaned up. + /// let tmp = tempdir().unwrap(); + /// + /// let server_path = tmp.path().join("server"); + /// let server = UnixDatagram::bind(&server_path)?; + /// + /// let client_path = tmp.path().join("client"); + /// let client = UnixDatagram::bind(&client_path)?; + /// + /// let size = client.try_send_to(bytes, &server_path)?; + /// assert_eq!(size, bytes.len()); + /// + /// let mut buffer = vec![0u8; 24]; + /// let (size, addr) = server.try_recv_from(&mut buffer)?; + /// + /// let dgram = &buffer[..size]; + /// assert_eq!(dgram, bytes); + /// assert_eq!(addr.as_pathname().unwrap(), &client_path); + /// # Ok(()) + /// # } + /// ``` + pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> + where + P: AsRef<Path>, + { + self.io.get_ref().send_to(buf, target) + } + + /// Receives data from the socket. + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// + /// // Create the pair of sockets + /// let (sock1, sock2) = UnixDatagram::pair()?; + /// + /// // Since the sockets are paired, the paired send/recv + /// // functions can be used + /// let bytes = b"hello world"; + /// sock1.send(bytes).await?; + /// + /// let mut buff = vec![0u8; 24]; + /// let size = sock2.recv(&mut buff).await?; + /// + /// let dgram = &buff[..size]; + /// assert_eq!(dgram, bytes); + /// + /// # Ok(()) + /// # } + /// ``` + pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.io + .async_io(mio::Interest::READABLE, |sock| sock.recv(buf)) + .await + } + + /// Try to receive a datagram from the peer without waiting. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// use tokio::net::UnixDatagram; + /// + /// let bytes = b"bytes"; + /// // We use a socket pair so that they are assigned + /// // each other as a peer. + /// let (first, second) = UnixDatagram::pair()?; + /// + /// let size = first.try_send(bytes)?; + /// assert_eq!(size, bytes.len()); + /// + /// let mut buffer = vec![0u8; 24]; + /// let size = second.try_recv(&mut buffer)?; + /// + /// let dgram = &buffer[..size]; + /// assert_eq!(dgram, bytes); + /// # Ok(()) + /// # } + /// ``` + pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.io.get_ref().recv(buf) + } + + /// Sends data on the socket to the specified address. + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// use tempfile::tempdir; + /// + /// // We use a temporary directory so that the socket + /// // files left by the bound sockets will get cleaned up. + /// let tmp = tempdir()?; + /// + /// // Bind each socket to a filesystem path + /// let tx_path = tmp.path().join("tx"); + /// let tx = UnixDatagram::bind(&tx_path)?; + /// let rx_path = tmp.path().join("rx"); + /// let rx = UnixDatagram::bind(&rx_path)?; + /// + /// let bytes = b"hello world"; + /// tx.send_to(bytes, &rx_path).await?; + /// + /// let mut buf = vec![0u8; 24]; + /// let (size, addr) = rx.recv_from(&mut buf).await?; + /// + /// let dgram = &buf[..size]; + /// assert_eq!(dgram, bytes); + /// assert_eq!(addr.as_pathname().unwrap(), &tx_path); + /// + /// # Ok(()) + /// # } + /// ``` + pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> + where + P: AsRef<Path>, + { + self.io + .async_io(mio::Interest::WRITABLE, |sock| { + sock.send_to(buf, target.as_ref()) + }) + .await + } + + /// Receives data from the socket. + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// use tempfile::tempdir; + /// + /// // We use a temporary directory so that the socket + /// // files left by the bound sockets will get cleaned up. + /// let tmp = tempdir()?; + /// + /// // Bind each socket to a filesystem path + /// let tx_path = tmp.path().join("tx"); + /// let tx = UnixDatagram::bind(&tx_path)?; + /// let rx_path = tmp.path().join("rx"); + /// let rx = UnixDatagram::bind(&rx_path)?; + /// + /// let bytes = b"hello world"; + /// tx.send_to(bytes, &rx_path).await?; + /// + /// let mut buf = vec![0u8; 24]; + /// let (size, addr) = rx.recv_from(&mut buf).await?; + /// + /// let dgram = &buf[..size]; + /// assert_eq!(dgram, bytes); + /// assert_eq!(addr.as_pathname().unwrap(), &tx_path); + /// + /// # Ok(()) + /// # } + /// ``` + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + let (n, addr) = self + .io + .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf)) + .await?; + + Ok((n, SocketAddr(addr))) + } + + /// Try to receive data from the socket without waiting. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// use tokio::net::UnixDatagram; + /// use tempfile::tempdir; + /// + /// let bytes = b"bytes"; + /// // We use a temporary directory so that the socket + /// // files left by the bound sockets will get cleaned up. + /// let tmp = tempdir().unwrap(); + /// + /// let server_path = tmp.path().join("server"); + /// let server = UnixDatagram::bind(&server_path)?; + /// + /// let client_path = tmp.path().join("client"); + /// let client = UnixDatagram::bind(&client_path)?; + /// + /// let size = client.try_send_to(bytes, &server_path)?; + /// assert_eq!(size, bytes.len()); + /// + /// let mut buffer = vec![0u8; 24]; + /// let (size, addr) = server.try_recv_from(&mut buffer)?; + /// + /// let dgram = &buffer[..size]; + /// assert_eq!(dgram, bytes); + /// assert_eq!(addr.as_pathname().unwrap(), &client_path); + /// # Ok(()) + /// # } + /// ``` + pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + let (n, addr) = self.io.get_ref().recv_from(buf)?; + Ok((n, SocketAddr(addr))) + } + + /// Returns the local address that this socket is bound to. + /// + /// # Examples + /// For a socket bound to a local path + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// use tempfile::tempdir; + /// + /// // We use a temporary directory so that the socket + /// // files left by the bound sockets will get cleaned up. + /// let tmp = tempdir()?; + /// + /// // Bind socket to a filesystem path + /// let socket_path = tmp.path().join("socket"); + /// let socket = UnixDatagram::bind(&socket_path)?; + /// + /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path); + /// + /// # Ok(()) + /// # } + /// ``` + /// + /// For an unbound socket + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// + /// // Create an unbound socket + /// let socket = UnixDatagram::unbound()?; + /// + /// assert!(socket.local_addr()?.is_unnamed()); + /// + /// # Ok(()) + /// # } + /// ``` + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr().map(SocketAddr) + } + + /// Returns the address of this socket's peer. + /// + /// The `connect` method will connect the socket to a peer. + /// + /// # Examples + /// For a peer with a local path + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// use tempfile::tempdir; + /// + /// // Create an unbound socket + /// let tx = UnixDatagram::unbound()?; + /// + /// // Create another, bound socket + /// let tmp = tempdir()?; + /// let rx_path = tmp.path().join("rx"); + /// let rx = UnixDatagram::bind(&rx_path)?; + /// + /// // Connect to the bound socket + /// tx.connect(&rx_path)?; + /// + /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path); + /// + /// # Ok(()) + /// # } + /// ``` + /// + /// For an unbound peer + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// + /// // Create the pair of sockets + /// let (sock1, sock2) = UnixDatagram::pair()?; + /// + /// assert!(sock1.peer_addr()?.is_unnamed()); + /// + /// # Ok(()) + /// # } + /// ``` + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().peer_addr().map(SocketAddr) + } + + /// Returns the value of the `SO_ERROR` option. + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// + /// // Create an unbound socket + /// let socket = UnixDatagram::unbound()?; + /// + /// if let Ok(Some(err)) = socket.take_error() { + /// println!("Got error: {:?}", err); + /// } + /// + /// # Ok(()) + /// # } + /// ``` + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + self.io.get_ref().take_error() + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the + /// specified portions to immediately return with an appropriate value + /// (see the documentation of `Shutdown`). + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// use std::net::Shutdown; + /// + /// // Create an unbound socket + /// let (socket, other) = UnixDatagram::pair()?; + /// + /// socket.shutdown(Shutdown::Both)?; + /// + /// // NOTE: the following commented out code does NOT work as expected. + /// // Due to an underlying issue, the recv call will block indefinitely. + /// // See: https://github.com/tokio-rs/tokio/issues/1679 + /// //let mut buff = vec![0u8; 24]; + /// //let size = socket.recv(&mut buff).await?; + /// //assert_eq!(size, 0); + /// + /// let send_result = socket.send(b"hello world").await; + /// assert!(send_result.is_err()); + /// + /// # Ok(()) + /// # } + /// ``` + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io.get_ref().shutdown(how) + } +} + +impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram { + type Error = io::Error; + + /// Consumes stream, returning the Tokio I/O object. + /// + /// This is equivalent to + /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std). + fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> { + Self::from_std(stream) + } +} + +impl fmt::Debug for UnixDatagram { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +impl AsRawFd for UnixDatagram { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } +} diff --git a/src/net/unix/incoming.rs b/src/net/unix/incoming.rs deleted file mode 100644 index af49360..0000000 --- a/src/net/unix/incoming.rs +++ /dev/null @@ -1,42 +0,0 @@ -use crate::net::unix::{UnixListener, UnixStream}; - -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Stream of listeners -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct Incoming<'a> { - inner: &'a mut UnixListener, -} - -impl Incoming<'_> { - pub(crate) fn new(listener: &mut UnixListener) -> Incoming<'_> { - Incoming { inner: listener } - } - - /// Attempts to poll `UnixStream` by polling inner `UnixListener` to accept - /// connection. - /// - /// If `UnixListener` isn't ready yet, `Poll::Pending` is returned and - /// current task will be notified by a waker. Otherwise `Poll::Ready` with - /// `Result` containing `UnixStream` will be returned. - pub fn poll_accept( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<io::Result<UnixStream>> { - let (socket, _) = ready!(self.inner.poll_accept(cx))?; - Poll::Ready(Ok(socket)) - } -} - -#[cfg(feature = "stream")] -impl crate::stream::Stream for Incoming<'_> { - type Item = io::Result<UnixStream>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let (socket, _) = ready!(self.inner.poll_accept(cx))?; - Poll::Ready(Some(Ok(socket))) - } -} diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs index 9b76cb0..b272645 100644 --- a/src/net/unix/listener.rs +++ b/src/net/unix/listener.rs @@ -1,17 +1,15 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::unix::{Incoming, UnixStream}; +use crate::net::unix::{SocketAddr, UnixStream}; -use mio::Ready; use std::convert::TryFrom; use std::fmt; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +use std::os::unix::net; use std::path::Path; use std::task::{Context, Poll}; -cfg_uds! { +cfg_net_unix! { /// A Unix socket which can accept connections from other Unix sockets. /// /// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method. Alternatively `UnixListener` @@ -47,7 +45,7 @@ cfg_uds! { /// } /// ``` pub struct UnixListener { - io: PollEvented<mio_uds::UnixListener>, + io: PollEvented<mio::net::UnixListener>, } } @@ -60,12 +58,12 @@ impl UnixListener { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn bind<P>(path: P) -> io::Result<UnixListener> where P: AsRef<Path>, { - let listener = mio_uds::UnixListener::bind(path)?; + let listener = mio::net::UnixListener::bind(path)?; let io = PollEvented::new(listener)?; Ok(UnixListener { io }) } @@ -82,16 +80,16 @@ impl UnixListener { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> { - let listener = mio_uds::UnixListener::from_listener(listener)?; + let listener = mio::net::UnixListener::from_std(listener); let io = PollEvented::new(listener)?; Ok(UnixListener { io }) } /// Returns the local socket address of this listener. pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr() + self.io.get_ref().local_addr().map(SocketAddr) } /// Returns the value of the `SO_ERROR` option. @@ -100,117 +98,62 @@ impl UnixListener { } /// Accepts a new incoming connection to this listener. - pub async fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> { - poll_fn(|cx| self.poll_accept(cx)).await + pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { + let (mio, addr) = self + .io + .async_io(mio::Interest::READABLE, |sock| sock.accept()) + .await?; + + let addr = SocketAddr(addr); + let stream = UnixStream::new(mio)?; + Ok((stream, addr)) } - pub(crate) fn poll_accept( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<io::Result<(UnixStream, SocketAddr)>> { - let (io, addr) = ready!(self.poll_accept_std(cx))?; - - let io = mio_uds::UnixStream::from_stream(io)?; - Ok((UnixStream::new(io)?, addr)).into() - } - - fn poll_accept_std( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<io::Result<(net::UnixStream, SocketAddr)>> { - ready!(self.io.poll_read_ready(cx, Ready::readable()))?; - - match self.io.get_ref().accept_std() { - Ok(None) => { - self.io.clear_read_ready(cx, Ready::readable())?; - Poll::Pending - } - Ok(Some((sock, addr))) => Ok((sock, addr)).into(), - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, Ready::readable())?; - Poll::Pending + /// Polls to accept a new incoming connection to this listener. + /// + /// If there is no connection to accept, `Poll::Pending` is returned and + /// the current task will be notified by a waker. + /// + /// When ready, the most recent task that called `poll_accept` is notified. + /// The caller is responsble to ensure that `poll_accept` is called from a + /// single task. Failing to do this could result in tasks hanging. + pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> { + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; + + match self.io.get_ref().accept() { + Ok((sock, addr)) => { + let addr = SocketAddr(addr); + let sock = UnixStream::new(sock)?; + return Poll::Ready(Ok((sock, addr))); + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(err) => return Err(err).into(), } - Err(err) => Err(err).into(), } } - - /// Returns a stream over the connections being received on this listener. - /// - /// Note that `UnixListener` also directly implements `Stream`. - /// - /// The returned stream will never return `None` and will also not yield the - /// peer's `SocketAddr` structure. Iterating over it is equivalent to - /// calling accept in a loop. - /// - /// # Errors - /// - /// Note that accepting a connection can lead to various errors and not all - /// of them are necessarily fatal ‒ for example having too many open file - /// descriptors or the other side closing the connection while it waits in - /// an accept queue. These would terminate the stream if not handled in any - /// way. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::UnixListener; - /// use tokio::stream::StreamExt; - /// - /// #[tokio::main] - /// async fn main() { - /// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap(); - /// let mut incoming = listener.incoming(); - /// - /// while let Some(stream) = incoming.next().await { - /// match stream { - /// Ok(stream) => { - /// println!("new client!"); - /// } - /// Err(e) => { /* connection failed */ } - /// } - /// } - /// } - /// ``` - pub fn incoming(&mut self) -> Incoming<'_> { - Incoming::new(self) - } } #[cfg(feature = "stream")] impl crate::stream::Stream for UnixListener { type Item = io::Result<UnixStream>; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let (socket, _) = ready!(self.poll_accept(cx))?; Poll::Ready(Some(Ok(socket))) } } -impl TryFrom<UnixListener> for mio_uds::UnixListener { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner - fn try_from(value: UnixListener) -> Result<Self, Self::Error> { - value.io.into_inner() - } -} - -impl TryFrom<net::UnixListener> for UnixListener { +impl TryFrom<std::os::unix::net::UnixListener> for UnixListener { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`UnixListener::from_std(stream)`](UnixListener::from_std). - fn try_from(stream: net::UnixListener) -> io::Result<Self> { + fn try_from(stream: std::os::unix::net::UnixListener) -> io::Result<Self> { Self::from_std(stream) } } diff --git a/src/net/unix/mod.rs b/src/net/unix/mod.rs index ddba60d..19ee34a 100644 --- a/src/net/unix/mod.rs +++ b/src/net/unix/mod.rs @@ -1,16 +1,18 @@ //! Unix domain socket utility types -pub(crate) mod datagram; - -mod incoming; -pub use incoming::Incoming; +pub mod datagram; pub(crate) mod listener; -pub(crate) use listener::UnixListener; mod split; pub use split::{ReadHalf, WriteHalf}; +mod split_owned; +pub use split_owned::{OwnedReadHalf, OwnedWriteHalf, ReuniteError}; + +mod socketaddr; +pub use socketaddr::SocketAddr; + pub(crate) mod stream; pub(crate) use stream::UnixStream; diff --git a/src/net/unix/socketaddr.rs b/src/net/unix/socketaddr.rs new file mode 100644 index 0000000..48f7b96 --- /dev/null +++ b/src/net/unix/socketaddr.rs @@ -0,0 +1,31 @@ +use std::fmt; +use std::path::Path; + +/// An address associated with a Tokio Unix socket. +pub struct SocketAddr(pub(super) mio::net::SocketAddr); + +impl SocketAddr { + /// Returns `true` if the address is unnamed. + /// + /// Documentation reflected in [`SocketAddr`] + /// + /// [`SocketAddr`]: std::os::unix::net::SocketAddr + pub fn is_unnamed(&self) -> bool { + self.0.is_unnamed() + } + + /// Returns the contents of this address if it is a `pathname` address. + /// + /// Documentation reflected in [`SocketAddr`] + /// + /// [`SocketAddr`]: std::os::unix::net::SocketAddr + pub fn as_pathname(&self) -> Option<&Path> { + self.0.as_pathname() + } +} + +impl fmt::Debug for SocketAddr { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(fmt) + } +} diff --git a/src/net/unix/split.rs b/src/net/unix/split.rs index 9b9fa5e..460bbc1 100644 --- a/src/net/unix/split.rs +++ b/src/net/unix/split.rs @@ -8,20 +8,40 @@ //! split has no associated overhead and enforces all invariants at the type //! level. -use crate::io::{AsyncRead, AsyncWrite}; +use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::net::UnixStream; use std::io; -use std::mem::MaybeUninit; use std::net::Shutdown; use std::pin::Pin; use std::task::{Context, Poll}; -/// Read half of a `UnixStream`. +/// Borrowed read half of a [`UnixStream`], created by [`split`]. +/// +/// Reading from a `ReadHalf` is usually done using the convenience methods found on the +/// [`AsyncReadExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: UnixStream +/// [`split`]: UnixStream::split() +/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt +/// [the prelude]: crate::prelude #[derive(Debug)] pub struct ReadHalf<'a>(&'a UnixStream); -/// Write half of a `UnixStream`. +/// Borrowed write half of a [`UnixStream`], created by [`split`]. +/// +/// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will +/// shut down the UnixStream stream in the write direction. +/// +/// Writing to an `WriteHalf` is usually done using the convenience methods found +/// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: UnixStream +/// [`split`]: UnixStream::split() +/// [`AsyncWrite`]: trait@crate::io::AsyncWrite +/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown +/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt +/// [the prelude]: crate::prelude #[derive(Debug)] pub struct WriteHalf<'a>(&'a UnixStream); @@ -30,15 +50,11 @@ pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) { } impl AsyncRead for ReadHalf<'_> { - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool { - false - } - fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<io::Result<usize>> { + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { self.0.poll_read_priv(cx, buf) } } diff --git a/src/net/unix/split_owned.rs b/src/net/unix/split_owned.rs new file mode 100644 index 0000000..ab23307 --- /dev/null +++ b/src/net/unix/split_owned.rs @@ -0,0 +1,182 @@ +//! `UnixStream` owned split support. +//! +//! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf` +//! with the `UnixStream::into_split` method. `OwnedReadHalf` implements +//! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`. +//! +//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized +//! split has no associated overhead and enforces all invariants at the type +//! level. + +use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::net::UnixStream; + +use std::error::Error; +use std::net::Shutdown; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{fmt, io}; + +/// Owned read half of a [`UnixStream`], created by [`into_split`]. +/// +/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found +/// on the [`AsyncReadExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: crate::net::UnixStream +/// [`into_split`]: crate::net::UnixStream::into_split() +/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt +/// [the prelude]: crate::prelude +#[derive(Debug)] +pub struct OwnedReadHalf { + inner: Arc<UnixStream>, +} + +/// Owned write half of a [`UnixStream`], created by [`into_split`]. +/// +/// Note that in the [`AsyncWrite`] implementation of this type, +/// [`poll_shutdown`] will shut down the stream in the write direction. +/// Dropping the write half will also shut down the write half of the stream. +/// +/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods +/// found on the [`AsyncWriteExt`] trait. Examples import this trait through +/// [the prelude]. +/// +/// [`UnixStream`]: crate::net::UnixStream +/// [`into_split`]: crate::net::UnixStream::into_split() +/// [`AsyncWrite`]: trait@crate::io::AsyncWrite +/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown +/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt +/// [the prelude]: crate::prelude +#[derive(Debug)] +pub struct OwnedWriteHalf { + inner: Arc<UnixStream>, + shutdown_on_drop: bool, +} + +pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) { + let arc = Arc::new(stream); + let read = OwnedReadHalf { + inner: Arc::clone(&arc), + }; + let write = OwnedWriteHalf { + inner: arc, + shutdown_on_drop: true, + }; + (read, write) +} + +pub(crate) fn reunite( + read: OwnedReadHalf, + write: OwnedWriteHalf, +) -> Result<UnixStream, ReuniteError> { + if Arc::ptr_eq(&read.inner, &write.inner) { + write.forget(); + // This unwrap cannot fail as the api does not allow creating more than two Arcs, + // and we just dropped the other half. + Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite")) + } else { + Err(ReuniteError(read, write)) + } +} + +/// Error indicating that two halves were not from the same socket, and thus could +/// not be reunited. +#[derive(Debug)] +pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf); + +impl fmt::Display for ReuniteError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "tried to reunite halves that are not from the same socket" + ) + } +} + +impl Error for ReuniteError {} + +impl OwnedReadHalf { + /// Attempts to put the two halves of a `UnixStream` back together and + /// recover the original socket. Succeeds only if the two halves + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: crate::net::UnixStream::into_split() + pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> { + reunite(self, other) + } +} + +impl AsyncRead for OwnedReadHalf { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self.inner.poll_read_priv(cx, buf) + } +} + +impl OwnedWriteHalf { + /// Attempts to put the two halves of a `UnixStream` back together and + /// recover the original socket. Succeeds only if the two halves + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: crate::net::UnixStream::into_split() + pub fn reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError> { + reunite(other, self) + } + + /// Destroy the write half, but don't close the write half of the stream + /// until the read half is dropped. If the read half has already been + /// dropped, this closes the stream. + pub fn forget(mut self) { + self.shutdown_on_drop = false; + drop(self); + } +} + +impl Drop for OwnedWriteHalf { + fn drop(&mut self) { + if self.shutdown_on_drop { + let _ = self.inner.shutdown(Shutdown::Write); + } + } +} + +impl AsyncWrite for OwnedWriteHalf { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.inner.poll_write_priv(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + // flush is a no-op + Poll::Ready(Ok(())) + } + + // `poll_shutdown` on a write half shutdowns the stream in the "write" direction. + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + let res = self.inner.shutdown(Shutdown::Write); + if res.is_ok() { + Pin::into_inner(self).shutdown_on_drop = false; + } + res.into() + } +} + +impl AsRef<UnixStream> for OwnedReadHalf { + fn as_ref(&self) -> &UnixStream { + &*self.inner + } +} + +impl AsRef<UnixStream> for OwnedWriteHalf { + fn as_ref(&self) -> &UnixStream { + &*self.inner + } +} diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index beae699..5138077 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -1,27 +1,28 @@ use crate::future::poll_fn; -use crate::io::{AsyncRead, AsyncWrite, PollEvented}; +use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; use crate::net::unix::split::{split, ReadHalf, WriteHalf}; +use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::unix::ucred::{self, UCred}; +use crate::net::unix::SocketAddr; use std::convert::TryFrom; use std::fmt; use std::io::{self, Read, Write}; -use std::mem::MaybeUninit; use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +use std::os::unix::net; use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; -cfg_uds! { +cfg_net_unix! { /// A structure representing a connected Unix socket. /// /// This socket can be connected directly with `UnixStream::connect` or accepted /// from a listener with `UnixListener::incoming`. Additionally, a pair of /// anonymous Unix sockets can be created with `UnixStream::pair`. pub struct UnixStream { - io: PollEvented<mio_uds::UnixStream>, + io: PollEvented<mio::net::UnixStream>, } } @@ -35,7 +36,7 @@ impl UnixStream { where P: AsRef<Path>, { - let stream = mio_uds::UnixStream::connect(path)?; + let stream = mio::net::UnixStream::connect(path)?; let stream = UnixStream::new(stream)?; poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; @@ -54,9 +55,9 @@ impl UnixStream { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> { - let stream = mio_uds::UnixStream::from_stream(stream)?; + let stream = mio::net::UnixStream::from_std(stream); let io = PollEvented::new(stream)?; Ok(UnixStream { io }) @@ -68,26 +69,26 @@ impl UnixStream { /// communicating back and forth between one another. Each socket will /// be associated with the default event loop's handle. pub fn pair() -> io::Result<(UnixStream, UnixStream)> { - let (a, b) = mio_uds::UnixStream::pair()?; + let (a, b) = mio::net::UnixStream::pair()?; let a = UnixStream::new(a)?; let b = UnixStream::new(b)?; Ok((a, b)) } - pub(crate) fn new(stream: mio_uds::UnixStream) -> io::Result<UnixStream> { + pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> { let io = PollEvented::new(stream)?; Ok(UnixStream { io }) } /// Returns the socket address of the local half of this connection. pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr() + self.io.get_ref().local_addr().map(SocketAddr) } /// Returns the socket address of the remote half of this connection. pub fn peer_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().peer_addr() + self.io.get_ref().peer_addr().map(SocketAddr) } /// Returns effective credentials of the process which called `connect` or `pair`. @@ -109,24 +110,33 @@ impl UnixStream { self.io.get_ref().shutdown(how) } + // These lifetime markers also appear in the generated documentation, and make + // it more clear that this is a *borrowed* split. + #[allow(clippy::needless_lifetimes)] /// Split a `UnixStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. - pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) { + /// + /// This method is more efficient than [`into_split`], but the halves cannot be + /// moved into independently spawned tasks. + /// + /// [`into_split`]: Self::into_split() + pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { split(self) } -} -impl TryFrom<UnixStream> for mio_uds::UnixStream { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. + /// Splits a `UnixStream` into a read half and a write half, which can be used + /// to read and write the stream concurrently. + /// + /// Unlike [`split`], the owned halves can be moved to separate tasks, however + /// this comes at the cost of a heap allocation. /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. + /// **Note:** Dropping the write half will shut down the write half of the + /// stream. This is equivalent to calling [`shutdown(Write)`] on the `UnixStream`. /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner - fn try_from(value: UnixStream) -> Result<Self, Self::Error> { - value.io.into_inner() + /// [`split`]: Self::split() + /// [`shutdown(Write)`]: fn@Self::shutdown + pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { + split_owned(self) } } @@ -143,15 +153,11 @@ impl TryFrom<net::UnixStream> for UnixStream { } impl AsyncRead for UnixStream { - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool { - false - } - fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<io::Result<usize>> { + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { self.poll_read_priv(cx, buf) } } @@ -190,16 +196,30 @@ impl UnixStream { pub(crate) fn poll_read_priv( &self, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().read(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; + + // Safety: `UnixStream::read` will not peek at the maybe uinitialized bytes. + let b = unsafe { + &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) + }; + match self.io.get_ref().read(b) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Ok(n) => { + // Safety: We trust `UnixStream::read` to have filled up `n` bytes + // in the buffer. + unsafe { + buf.assume_init(n); + } + buf.advance(n); + return Poll::Ready(Ok(())); + } + Err(e) => return Poll::Ready(Err(e)), } - x => Poll::Ready(x), } } @@ -208,14 +228,15 @@ impl UnixStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().write(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending + loop { + let ev = ready!(self.io.poll_write_ready(cx))?; + + match self.io.get_ref().write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + x => return Poll::Ready(x), } - x => Poll::Ready(x), } } } diff --git a/src/net/unix/ucred.rs b/src/net/unix/ucred.rs index 466aedc..ef214a7 100644 --- a/src/net/unix/ucred.rs +++ b/src/net/unix/ucred.rs @@ -4,9 +4,21 @@ use libc::{gid_t, uid_t}; #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] pub struct UCred { /// UID (user ID) of the process - pub uid: uid_t, + uid: uid_t, /// GID (group ID) of the process - pub gid: gid_t, + gid: gid_t, +} + +impl UCred { + /// Gets UID (user ID) of the process. + pub fn uid(&self) -> uid_t { + self.uid + } + + /// Gets GID (group ID) of the process. + pub fn gid(&self) -> gid_t { + self.gid + } } #[cfg(any(target_os = "linux", target_os = "android"))] |