aboutsummaryrefslogtreecommitdiff
path: root/src/net/unix
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/unix')
-rw-r--r--src/net/unix/datagram.rs242
-rw-r--r--src/net/unix/datagram/mod.rs3
-rw-r--r--src/net/unix/datagram/socket.rs731
-rw-r--r--src/net/unix/incoming.rs42
-rw-r--r--src/net/unix/listener.rs143
-rw-r--r--src/net/unix/mod.rs12
-rw-r--r--src/net/unix/socketaddr.rs31
-rw-r--r--src/net/unix/split.rs36
-rw-r--r--src/net/unix/split_owned.rs182
-rw-r--r--src/net/unix/stream.rs111
-rw-r--r--src/net/unix/ucred.rs16
11 files changed, 1103 insertions, 446 deletions
diff --git a/src/net/unix/datagram.rs b/src/net/unix/datagram.rs
deleted file mode 100644
index ff0f424..0000000
--- a/src/net/unix/datagram.rs
+++ /dev/null
@@ -1,242 +0,0 @@
-use crate::future::poll_fn;
-use crate::io::PollEvented;
-
-use std::convert::TryFrom;
-use std::fmt;
-use std::io;
-use std::net::Shutdown;
-use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::{self, SocketAddr};
-use std::path::Path;
-use std::task::{Context, Poll};
-
-cfg_uds! {
- /// An I/O object representing a Unix datagram socket.
- pub struct UnixDatagram {
- io: PollEvented<mio_uds::UnixDatagram>,
- }
-}
-
-impl UnixDatagram {
- /// Creates a new `UnixDatagram` bound to the specified path.
- pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
- where
- P: AsRef<Path>,
- {
- let socket = mio_uds::UnixDatagram::bind(path)?;
- UnixDatagram::new(socket)
- }
-
- /// Creates an unnamed pair of connected sockets.
- ///
- /// This function will create a pair of interconnected Unix sockets for
- /// communicating back and forth between one another. Each socket will
- /// be associated with the default event loop's handle.
- pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
- let (a, b) = mio_uds::UnixDatagram::pair()?;
- let a = UnixDatagram::new(a)?;
- let b = UnixDatagram::new(b)?;
-
- Ok((a, b))
- }
-
- /// Consumes a `UnixDatagram` in the standard library and returns a
- /// nonblocking `UnixDatagram` from this crate.
- ///
- /// The returned datagram will be associated with the given event loop
- /// specified by `handle` and is ready to perform I/O.
- ///
- /// # Panics
- ///
- /// This function panics if thread-local runtime is not set.
- ///
- /// The runtime is usually set implicitly when this function is called
- /// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
- pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
- let socket = mio_uds::UnixDatagram::from_datagram(datagram)?;
- let io = PollEvented::new(socket)?;
- Ok(UnixDatagram { io })
- }
-
- fn new(socket: mio_uds::UnixDatagram) -> io::Result<UnixDatagram> {
- let io = PollEvented::new(socket)?;
- Ok(UnixDatagram { io })
- }
-
- /// Creates a new `UnixDatagram` which is not bound to any address.
- pub fn unbound() -> io::Result<UnixDatagram> {
- let socket = mio_uds::UnixDatagram::unbound()?;
- UnixDatagram::new(socket)
- }
-
- /// Connects the socket to the specified address.
- ///
- /// The `send` method may be used to send data to the specified address.
- /// `recv` and `recv_from` will only receive data from that address.
- pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
- self.io.get_ref().connect(path)
- }
-
- /// Sends data on the socket to the socket's peer.
- pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_send_priv(cx, buf)).await
- }
-
- // Poll IO functions that takes `&self` are provided for the split API.
- //
- // They are not public because (taken from the doc of `PollEvented`):
- //
- // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
- // caller must ensure that there are at most two tasks that use a
- // `PollEvented` instance concurrently. One for reading and one for writing.
- // While violating this requirement is "safe" from a Rust memory model point
- // of view, it will result in unexpected behavior in the form of lost
- // notifications and tasks hanging.
- pub(crate) fn poll_send_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().send(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
- }
-
- /// Receives data from the socket.
- pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_recv_priv(cx, buf)).await
- }
-
- pub(crate) fn poll_recv_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().recv(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
- }
-
- /// Sends data on the socket to the specified address.
- pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
- where
- P: AsRef<Path> + Unpin,
- {
- poll_fn(|cx| self.poll_send_to_priv(cx, buf, target.as_ref())).await
- }
-
- pub(crate) fn poll_send_to_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &[u8],
- target: &Path,
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().send_to(buf, target) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
- }
-
- /// Receives data from the socket.
- pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await
- }
-
- pub(crate) fn poll_recv_from_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<Result<(usize, SocketAddr), io::Error>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().recv_from(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
- }
-
- /// Returns the local address that this socket is bound to.
- pub fn local_addr(&self) -> io::Result<SocketAddr> {
- self.io.get_ref().local_addr()
- }
-
- /// Returns the address of this socket's peer.
- ///
- /// The `connect` method will connect the socket to a peer.
- pub fn peer_addr(&self) -> io::Result<SocketAddr> {
- self.io.get_ref().peer_addr()
- }
-
- /// Returns the value of the `SO_ERROR` option.
- pub fn take_error(&self) -> io::Result<Option<io::Error>> {
- self.io.get_ref().take_error()
- }
-
- /// Shuts down the read, write, or both halves of this connection.
- ///
- /// This function will cause all pending and future I/O calls on the
- /// specified portions to immediately return with an appropriate value
- /// (see the documentation of `Shutdown`).
- pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
- self.io.get_ref().shutdown(how)
- }
-}
-
-impl TryFrom<UnixDatagram> for mio_uds::UnixDatagram {
- type Error = io::Error;
-
- /// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
- fn try_from(value: UnixDatagram) -> Result<Self, Self::Error> {
- value.io.into_inner()
- }
-}
-
-impl TryFrom<net::UnixDatagram> for UnixDatagram {
- type Error = io::Error;
-
- /// Consumes stream, returning the tokio I/O object.
- ///
- /// This is equivalent to
- /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
- fn try_from(stream: net::UnixDatagram) -> Result<Self, Self::Error> {
- Self::from_std(stream)
- }
-}
-
-impl fmt::Debug for UnixDatagram {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.io.get_ref().fmt(f)
- }
-}
-
-impl AsRawFd for UnixDatagram {
- fn as_raw_fd(&self) -> RawFd {
- self.io.get_ref().as_raw_fd()
- }
-}
diff --git a/src/net/unix/datagram/mod.rs b/src/net/unix/datagram/mod.rs
new file mode 100644
index 0000000..6268b4a
--- /dev/null
+++ b/src/net/unix/datagram/mod.rs
@@ -0,0 +1,3 @@
+//! Unix datagram types.
+
+pub(crate) mod socket;
diff --git a/src/net/unix/datagram/socket.rs b/src/net/unix/datagram/socket.rs
new file mode 100644
index 0000000..3ae66d1
--- /dev/null
+++ b/src/net/unix/datagram/socket.rs
@@ -0,0 +1,731 @@
+use crate::io::PollEvented;
+use crate::net::unix::SocketAddr;
+
+use std::convert::TryFrom;
+use std::fmt;
+use std::io;
+use std::net::Shutdown;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::os::unix::net;
+use std::path::Path;
+
+cfg_net_unix! {
+ /// An I/O object representing a Unix datagram socket.
+ ///
+ /// A socket can be either named (associated with a filesystem path) or
+ /// unnamed.
+ ///
+ /// **Note:** named sockets are persisted even after the object is dropped
+ /// and the program has exited, and cannot be reconnected. It is advised
+ /// that you either check for and unlink the existing socket if it exists,
+ /// or use a temporary file that is guaranteed to not already exist.
+ ///
+ /// # Examples
+ /// Using named sockets, associated with a filesystem path:
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind each socket to a filesystem path
+ /// let tx_path = tmp.path().join("tx");
+ /// let tx = UnixDatagram::bind(&tx_path)?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// let bytes = b"hello world";
+ /// tx.send_to(bytes, &rx_path).await?;
+ ///
+ /// let mut buf = vec![0u8; 24];
+ /// let (size, addr) = rx.recv_from(&mut buf).await?;
+ ///
+ /// let dgram = &buf[..size];
+ /// assert_eq!(dgram, bytes);
+ /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// Using unnamed sockets, created as a pair
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create the pair of sockets
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
+ ///
+ /// // Since the sockets are paired, the paired send/recv
+ /// // functions can be used
+ /// let bytes = b"hello world";
+ /// sock1.send(bytes).await?;
+ ///
+ /// let mut buff = vec![0u8; 24];
+ /// let size = sock2.recv(&mut buff).await?;
+ ///
+ /// let dgram = &buff[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub struct UnixDatagram {
+ io: PollEvented<mio::net::UnixDatagram>,
+ }
+}
+
+impl UnixDatagram {
+ /// Creates a new `UnixDatagram` bound to the specified path.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind the socket to a filesystem path
+ /// let socket_path = tmp.path().join("socket");
+ /// let socket = UnixDatagram::bind(&socket_path)?;
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
+ where
+ P: AsRef<Path>,
+ {
+ let socket = mio::net::UnixDatagram::bind(path)?;
+ UnixDatagram::new(socket)
+ }
+
+ /// Creates an unnamed pair of connected sockets.
+ ///
+ /// This function will create a pair of interconnected Unix sockets for
+ /// communicating back and forth between one another.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create the pair of sockets
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
+ ///
+ /// // Since the sockets are paired, the paired send/recv
+ /// // functions can be used
+ /// let bytes = b"hail eris";
+ /// sock1.send(bytes).await?;
+ ///
+ /// let mut buff = vec![0u8; 24];
+ /// let size = sock2.recv(&mut buff).await?;
+ ///
+ /// let dgram = &buff[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
+ let (a, b) = mio::net::UnixDatagram::pair()?;
+ let a = UnixDatagram::new(a)?;
+ let b = UnixDatagram::new(b)?;
+
+ Ok((a, b))
+ }
+
+ /// Consumes a `UnixDatagram` in the standard library and returns a
+ /// nonblocking `UnixDatagram` from this crate.
+ ///
+ /// The returned datagram will be associated with the given event loop
+ /// specified by `handle` and is ready to perform I/O.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if thread-local runtime is not set.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a Tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use std::os::unix::net::UnixDatagram as StdUDS;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind the socket to a filesystem path
+ /// let socket_path = tmp.path().join("socket");
+ /// let std_socket = StdUDS::bind(&socket_path)?;
+ /// let tokio_socket = UnixDatagram::from_std(std_socket)?;
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
+ let socket = mio::net::UnixDatagram::from_std(datagram);
+ let io = PollEvented::new(socket)?;
+ Ok(UnixDatagram { io })
+ }
+
+ fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
+ let io = PollEvented::new(socket)?;
+ Ok(UnixDatagram { io })
+ }
+
+ /// Creates a new `UnixDatagram` which is not bound to any address.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // Create an unbound socket
+ /// let tx = UnixDatagram::unbound()?;
+ ///
+ /// // Create another, bound socket
+ /// let tmp = tempdir()?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// // Send to the bound socket
+ /// let bytes = b"hello world";
+ /// tx.send_to(bytes, &rx_path).await?;
+ ///
+ /// let mut buf = vec![0u8; 24];
+ /// let (size, addr) = rx.recv_from(&mut buf).await?;
+ ///
+ /// let dgram = &buf[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn unbound() -> io::Result<UnixDatagram> {
+ let socket = mio::net::UnixDatagram::unbound()?;
+ UnixDatagram::new(socket)
+ }
+
+ /// Connects the socket to the specified address.
+ ///
+ /// The `send` method may be used to send data to the specified address.
+ /// `recv` and `recv_from` will only receive data from that address.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // Create an unbound socket
+ /// let tx = UnixDatagram::unbound()?;
+ ///
+ /// // Create another, bound socket
+ /// let tmp = tempdir()?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// // Connect to the bound socket
+ /// tx.connect(&rx_path)?;
+ ///
+ /// // Send to the bound socket
+ /// let bytes = b"hello world";
+ /// tx.send(bytes).await?;
+ ///
+ /// let mut buf = vec![0u8; 24];
+ /// let (size, addr) = rx.recv_from(&mut buf).await?;
+ ///
+ /// let dgram = &buf[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
+ self.io.get_ref().connect(path)
+ }
+
+ /// Sends data on the socket to the socket's peer.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create the pair of sockets
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
+ ///
+ /// // Since the sockets are paired, the paired send/recv
+ /// // functions can be used
+ /// let bytes = b"hello world";
+ /// sock1.send(bytes).await?;
+ ///
+ /// let mut buff = vec![0u8; 24];
+ /// let size = sock2.recv(&mut buff).await?;
+ ///
+ /// let dgram = &buff[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf))
+ .await
+ }
+
+ /// Try to send a datagram to the peer without waiting.
+ ///
+ /// # Examples
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// let bytes = b"bytes";
+ /// // We use a socket pair so that they are assigned
+ /// // each other as a peer.
+ /// let (first, second) = UnixDatagram::pair()?;
+ ///
+ /// let size = first.try_send(bytes)?;
+ /// assert_eq!(size, bytes.len());
+ ///
+ /// let mut buffer = vec![0u8; 24];
+ /// let size = second.try_recv(&mut buffer)?;
+ ///
+ /// let dgram = &buffer[..size];
+ /// assert_eq!(dgram, bytes);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io.get_ref().send(buf)
+ }
+
+ /// Try to send a datagram to the peer without waiting.
+ ///
+ /// # Examples
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// let bytes = b"bytes";
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir().unwrap();
+ ///
+ /// let server_path = tmp.path().join("server");
+ /// let server = UnixDatagram::bind(&server_path)?;
+ ///
+ /// let client_path = tmp.path().join("client");
+ /// let client = UnixDatagram::bind(&client_path)?;
+ ///
+ /// let size = client.try_send_to(bytes, &server_path)?;
+ /// assert_eq!(size, bytes.len());
+ ///
+ /// let mut buffer = vec![0u8; 24];
+ /// let (size, addr) = server.try_recv_from(&mut buffer)?;
+ ///
+ /// let dgram = &buffer[..size];
+ /// assert_eq!(dgram, bytes);
+ /// assert_eq!(addr.as_pathname().unwrap(), &client_path);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
+ where
+ P: AsRef<Path>,
+ {
+ self.io.get_ref().send_to(buf, target)
+ }
+
+ /// Receives data from the socket.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create the pair of sockets
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
+ ///
+ /// // Since the sockets are paired, the paired send/recv
+ /// // functions can be used
+ /// let bytes = b"hello world";
+ /// sock1.send(bytes).await?;
+ ///
+ /// let mut buff = vec![0u8; 24];
+ /// let size = sock2.recv(&mut buff).await?;
+ ///
+ /// let dgram = &buff[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Interest::READABLE, |sock| sock.recv(buf))
+ .await
+ }
+
+ /// Try to receive a datagram from the peer without waiting.
+ ///
+ /// # Examples
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// let bytes = b"bytes";
+ /// // We use a socket pair so that they are assigned
+ /// // each other as a peer.
+ /// let (first, second) = UnixDatagram::pair()?;
+ ///
+ /// let size = first.try_send(bytes)?;
+ /// assert_eq!(size, bytes.len());
+ ///
+ /// let mut buffer = vec![0u8; 24];
+ /// let size = second.try_recv(&mut buffer)?;
+ ///
+ /// let dgram = &buffer[..size];
+ /// assert_eq!(dgram, bytes);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io.get_ref().recv(buf)
+ }
+
+ /// Sends data on the socket to the specified address.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind each socket to a filesystem path
+ /// let tx_path = tmp.path().join("tx");
+ /// let tx = UnixDatagram::bind(&tx_path)?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// let bytes = b"hello world";
+ /// tx.send_to(bytes, &rx_path).await?;
+ ///
+ /// let mut buf = vec![0u8; 24];
+ /// let (size, addr) = rx.recv_from(&mut buf).await?;
+ ///
+ /// let dgram = &buf[..size];
+ /// assert_eq!(dgram, bytes);
+ /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
+ where
+ P: AsRef<Path>,
+ {
+ self.io
+ .async_io(mio::Interest::WRITABLE, |sock| {
+ sock.send_to(buf, target.as_ref())
+ })
+ .await
+ }
+
+ /// Receives data from the socket.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind each socket to a filesystem path
+ /// let tx_path = tmp.path().join("tx");
+ /// let tx = UnixDatagram::bind(&tx_path)?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// let bytes = b"hello world";
+ /// tx.send_to(bytes, &rx_path).await?;
+ ///
+ /// let mut buf = vec![0u8; 24];
+ /// let (size, addr) = rx.recv_from(&mut buf).await?;
+ ///
+ /// let dgram = &buf[..size];
+ /// assert_eq!(dgram, bytes);
+ /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ let (n, addr) = self
+ .io
+ .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf))
+ .await?;
+
+ Ok((n, SocketAddr(addr)))
+ }
+
+ /// Try to receive data from the socket without waiting.
+ ///
+ /// # Examples
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// let bytes = b"bytes";
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir().unwrap();
+ ///
+ /// let server_path = tmp.path().join("server");
+ /// let server = UnixDatagram::bind(&server_path)?;
+ ///
+ /// let client_path = tmp.path().join("client");
+ /// let client = UnixDatagram::bind(&client_path)?;
+ ///
+ /// let size = client.try_send_to(bytes, &server_path)?;
+ /// assert_eq!(size, bytes.len());
+ ///
+ /// let mut buffer = vec![0u8; 24];
+ /// let (size, addr) = server.try_recv_from(&mut buffer)?;
+ ///
+ /// let dgram = &buffer[..size];
+ /// assert_eq!(dgram, bytes);
+ /// assert_eq!(addr.as_pathname().unwrap(), &client_path);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ let (n, addr) = self.io.get_ref().recv_from(buf)?;
+ Ok((n, SocketAddr(addr)))
+ }
+
+ /// Returns the local address that this socket is bound to.
+ ///
+ /// # Examples
+ /// For a socket bound to a local path
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind socket to a filesystem path
+ /// let socket_path = tmp.path().join("socket");
+ /// let socket = UnixDatagram::bind(&socket_path)?;
+ ///
+ /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// For an unbound socket
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create an unbound socket
+ /// let socket = UnixDatagram::unbound()?;
+ ///
+ /// assert!(socket.local_addr()?.is_unnamed());
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().local_addr().map(SocketAddr)
+ }
+
+ /// Returns the address of this socket's peer.
+ ///
+ /// The `connect` method will connect the socket to a peer.
+ ///
+ /// # Examples
+ /// For a peer with a local path
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // Create an unbound socket
+ /// let tx = UnixDatagram::unbound()?;
+ ///
+ /// // Create another, bound socket
+ /// let tmp = tempdir()?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// // Connect to the bound socket
+ /// tx.connect(&rx_path)?;
+ ///
+ /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// For an unbound peer
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create the pair of sockets
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
+ ///
+ /// assert!(sock1.peer_addr()?.is_unnamed());
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().peer_addr().map(SocketAddr)
+ }
+
+ /// Returns the value of the `SO_ERROR` option.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create an unbound socket
+ /// let socket = UnixDatagram::unbound()?;
+ ///
+ /// if let Ok(Some(err)) = socket.take_error() {
+ /// println!("Got error: {:?}", err);
+ /// }
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.get_ref().take_error()
+ }
+
+ /// Shuts down the read, write, or both halves of this connection.
+ ///
+ /// This function will cause all pending and future I/O calls on the
+ /// specified portions to immediately return with an appropriate value
+ /// (see the documentation of `Shutdown`).
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use std::net::Shutdown;
+ ///
+ /// // Create an unbound socket
+ /// let (socket, other) = UnixDatagram::pair()?;
+ ///
+ /// socket.shutdown(Shutdown::Both)?;
+ ///
+ /// // NOTE: the following commented out code does NOT work as expected.
+ /// // Due to an underlying issue, the recv call will block indefinitely.
+ /// // See: https://github.com/tokio-rs/tokio/issues/1679
+ /// //let mut buff = vec![0u8; 24];
+ /// //let size = socket.recv(&mut buff).await?;
+ /// //assert_eq!(size, 0);
+ ///
+ /// let send_result = socket.send(b"hello world").await;
+ /// assert!(send_result.is_err());
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
+ self.io.get_ref().shutdown(how)
+ }
+}
+
+impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
+ type Error = io::Error;
+
+ /// Consumes stream, returning the Tokio I/O object.
+ ///
+ /// This is equivalent to
+ /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
+ fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> {
+ Self::from_std(stream)
+ }
+}
+
+impl fmt::Debug for UnixDatagram {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.io.get_ref().fmt(f)
+ }
+}
+
+impl AsRawFd for UnixDatagram {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.get_ref().as_raw_fd()
+ }
+}
diff --git a/src/net/unix/incoming.rs b/src/net/unix/incoming.rs
deleted file mode 100644
index af49360..0000000
--- a/src/net/unix/incoming.rs
+++ /dev/null
@@ -1,42 +0,0 @@
-use crate::net::unix::{UnixListener, UnixStream};
-
-use std::io;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-/// Stream of listeners
-#[derive(Debug)]
-#[must_use = "streams do nothing unless polled"]
-pub struct Incoming<'a> {
- inner: &'a mut UnixListener,
-}
-
-impl Incoming<'_> {
- pub(crate) fn new(listener: &mut UnixListener) -> Incoming<'_> {
- Incoming { inner: listener }
- }
-
- /// Attempts to poll `UnixStream` by polling inner `UnixListener` to accept
- /// connection.
- ///
- /// If `UnixListener` isn't ready yet, `Poll::Pending` is returned and
- /// current task will be notified by a waker. Otherwise `Poll::Ready` with
- /// `Result` containing `UnixStream` will be returned.
- pub fn poll_accept(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<UnixStream>> {
- let (socket, _) = ready!(self.inner.poll_accept(cx))?;
- Poll::Ready(Ok(socket))
- }
-}
-
-#[cfg(feature = "stream")]
-impl crate::stream::Stream for Incoming<'_> {
- type Item = io::Result<UnixStream>;
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let (socket, _) = ready!(self.inner.poll_accept(cx))?;
- Poll::Ready(Some(Ok(socket)))
- }
-}
diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs
index 9b76cb0..b272645 100644
--- a/src/net/unix/listener.rs
+++ b/src/net/unix/listener.rs
@@ -1,17 +1,15 @@
-use crate::future::poll_fn;
use crate::io::PollEvented;
-use crate::net::unix::{Incoming, UnixStream};
+use crate::net::unix::{SocketAddr, UnixStream};
-use mio::Ready;
use std::convert::TryFrom;
use std::fmt;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::{self, SocketAddr};
+use std::os::unix::net;
use std::path::Path;
use std::task::{Context, Poll};
-cfg_uds! {
+cfg_net_unix! {
/// A Unix socket which can accept connections from other Unix sockets.
///
/// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method. Alternatively `UnixListener`
@@ -47,7 +45,7 @@ cfg_uds! {
/// }
/// ```
pub struct UnixListener {
- io: PollEvented<mio_uds::UnixListener>,
+ io: PollEvented<mio::net::UnixListener>,
}
}
@@ -60,12 +58,12 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn bind<P>(path: P) -> io::Result<UnixListener>
where
P: AsRef<Path>,
{
- let listener = mio_uds::UnixListener::bind(path)?;
+ let listener = mio::net::UnixListener::bind(path)?;
let io = PollEvented::new(listener)?;
Ok(UnixListener { io })
}
@@ -82,16 +80,16 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> {
- let listener = mio_uds::UnixListener::from_listener(listener)?;
+ let listener = mio::net::UnixListener::from_std(listener);
let io = PollEvented::new(listener)?;
Ok(UnixListener { io })
}
/// Returns the local socket address of this listener.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
- self.io.get_ref().local_addr()
+ self.io.get_ref().local_addr().map(SocketAddr)
}
/// Returns the value of the `SO_ERROR` option.
@@ -100,117 +98,62 @@ impl UnixListener {
}
/// Accepts a new incoming connection to this listener.
- pub async fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> {
- poll_fn(|cx| self.poll_accept(cx)).await
+ pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
+ let (mio, addr) = self
+ .io
+ .async_io(mio::Interest::READABLE, |sock| sock.accept())
+ .await?;
+
+ let addr = SocketAddr(addr);
+ let stream = UnixStream::new(mio)?;
+ Ok((stream, addr))
}
- pub(crate) fn poll_accept(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
- let (io, addr) = ready!(self.poll_accept_std(cx))?;
-
- let io = mio_uds::UnixStream::from_stream(io)?;
- Ok((UnixStream::new(io)?, addr)).into()
- }
-
- fn poll_accept_std(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<(net::UnixStream, SocketAddr)>> {
- ready!(self.io.poll_read_ready(cx, Ready::readable()))?;
-
- match self.io.get_ref().accept_std() {
- Ok(None) => {
- self.io.clear_read_ready(cx, Ready::readable())?;
- Poll::Pending
- }
- Ok(Some((sock, addr))) => Ok((sock, addr)).into(),
- Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, Ready::readable())?;
- Poll::Pending
+ /// Polls to accept a new incoming connection to this listener.
+ ///
+ /// If there is no connection to accept, `Poll::Pending` is returned and
+ /// the current task will be notified by a waker.
+ ///
+ /// When ready, the most recent task that called `poll_accept` is notified.
+ /// The caller is responsble to ensure that `poll_accept` is called from a
+ /// single task. Failing to do this could result in tasks hanging.
+ pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
+
+ match self.io.get_ref().accept() {
+ Ok((sock, addr)) => {
+ let addr = SocketAddr(addr);
+ let sock = UnixStream::new(sock)?;
+ return Poll::Ready(Ok((sock, addr)));
+ }
+ Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(err) => return Err(err).into(),
}
- Err(err) => Err(err).into(),
}
}
-
- /// Returns a stream over the connections being received on this listener.
- ///
- /// Note that `UnixListener` also directly implements `Stream`.
- ///
- /// The returned stream will never return `None` and will also not yield the
- /// peer's `SocketAddr` structure. Iterating over it is equivalent to
- /// calling accept in a loop.
- ///
- /// # Errors
- ///
- /// Note that accepting a connection can lead to various errors and not all
- /// of them are necessarily fatal ‒ for example having too many open file
- /// descriptors or the other side closing the connection while it waits in
- /// an accept queue. These would terminate the stream if not handled in any
- /// way.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::UnixListener;
- /// use tokio::stream::StreamExt;
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap();
- /// let mut incoming = listener.incoming();
- ///
- /// while let Some(stream) = incoming.next().await {
- /// match stream {
- /// Ok(stream) => {
- /// println!("new client!");
- /// }
- /// Err(e) => { /* connection failed */ }
- /// }
- /// }
- /// }
- /// ```
- pub fn incoming(&mut self) -> Incoming<'_> {
- Incoming::new(self)
- }
}
#[cfg(feature = "stream")]
impl crate::stream::Stream for UnixListener {
type Item = io::Result<UnixStream>;
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
+ fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(self.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket)))
}
}
-impl TryFrom<UnixListener> for mio_uds::UnixListener {
- type Error = io::Error;
-
- /// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
- fn try_from(value: UnixListener) -> Result<Self, Self::Error> {
- value.io.into_inner()
- }
-}
-
-impl TryFrom<net::UnixListener> for UnixListener {
+impl TryFrom<std::os::unix::net::UnixListener> for UnixListener {
type Error = io::Error;
/// Consumes stream, returning the tokio I/O object.
///
/// This is equivalent to
/// [`UnixListener::from_std(stream)`](UnixListener::from_std).
- fn try_from(stream: net::UnixListener) -> io::Result<Self> {
+ fn try_from(stream: std::os::unix::net::UnixListener) -> io::Result<Self> {
Self::from_std(stream)
}
}
diff --git a/src/net/unix/mod.rs b/src/net/unix/mod.rs
index ddba60d..19ee34a 100644
--- a/src/net/unix/mod.rs
+++ b/src/net/unix/mod.rs
@@ -1,16 +1,18 @@
//! Unix domain socket utility types
-pub(crate) mod datagram;
-
-mod incoming;
-pub use incoming::Incoming;
+pub mod datagram;
pub(crate) mod listener;
-pub(crate) use listener::UnixListener;
mod split;
pub use split::{ReadHalf, WriteHalf};
+mod split_owned;
+pub use split_owned::{OwnedReadHalf, OwnedWriteHalf, ReuniteError};
+
+mod socketaddr;
+pub use socketaddr::SocketAddr;
+
pub(crate) mod stream;
pub(crate) use stream::UnixStream;
diff --git a/src/net/unix/socketaddr.rs b/src/net/unix/socketaddr.rs
new file mode 100644
index 0000000..48f7b96
--- /dev/null
+++ b/src/net/unix/socketaddr.rs
@@ -0,0 +1,31 @@
+use std::fmt;
+use std::path::Path;
+
+/// An address associated with a Tokio Unix socket.
+pub struct SocketAddr(pub(super) mio::net::SocketAddr);
+
+impl SocketAddr {
+ /// Returns `true` if the address is unnamed.
+ ///
+ /// Documentation reflected in [`SocketAddr`]
+ ///
+ /// [`SocketAddr`]: std::os::unix::net::SocketAddr
+ pub fn is_unnamed(&self) -> bool {
+ self.0.is_unnamed()
+ }
+
+ /// Returns the contents of this address if it is a `pathname` address.
+ ///
+ /// Documentation reflected in [`SocketAddr`]
+ ///
+ /// [`SocketAddr`]: std::os::unix::net::SocketAddr
+ pub fn as_pathname(&self) -> Option<&Path> {
+ self.0.as_pathname()
+ }
+}
+
+impl fmt::Debug for SocketAddr {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt(fmt)
+ }
+}
diff --git a/src/net/unix/split.rs b/src/net/unix/split.rs
index 9b9fa5e..460bbc1 100644
--- a/src/net/unix/split.rs
+++ b/src/net/unix/split.rs
@@ -8,20 +8,40 @@
//! split has no associated overhead and enforces all invariants at the type
//! level.
-use crate::io::{AsyncRead, AsyncWrite};
+use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::net::UnixStream;
use std::io;
-use std::mem::MaybeUninit;
use std::net::Shutdown;
use std::pin::Pin;
use std::task::{Context, Poll};
-/// Read half of a `UnixStream`.
+/// Borrowed read half of a [`UnixStream`], created by [`split`].
+///
+/// Reading from a `ReadHalf` is usually done using the convenience methods found on the
+/// [`AsyncReadExt`] trait. Examples import this trait through [the prelude].
+///
+/// [`UnixStream`]: UnixStream
+/// [`split`]: UnixStream::split()
+/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
+/// [the prelude]: crate::prelude
#[derive(Debug)]
pub struct ReadHalf<'a>(&'a UnixStream);
-/// Write half of a `UnixStream`.
+/// Borrowed write half of a [`UnixStream`], created by [`split`].
+///
+/// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will
+/// shut down the UnixStream stream in the write direction.
+///
+/// Writing to an `WriteHalf` is usually done using the convenience methods found
+/// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude].
+///
+/// [`UnixStream`]: UnixStream
+/// [`split`]: UnixStream::split()
+/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
+/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
+/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+/// [the prelude]: crate::prelude
#[derive(Debug)]
pub struct WriteHalf<'a>(&'a UnixStream);
@@ -30,15 +50,11 @@ pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
}
impl AsyncRead for ReadHalf<'_> {
- unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
- false
- }
-
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
self.0.poll_read_priv(cx, buf)
}
}
diff --git a/src/net/unix/split_owned.rs b/src/net/unix/split_owned.rs
new file mode 100644
index 0000000..ab23307
--- /dev/null
+++ b/src/net/unix/split_owned.rs
@@ -0,0 +1,182 @@
+//! `UnixStream` owned split support.
+//!
+//! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf`
+//! with the `UnixStream::into_split` method. `OwnedReadHalf` implements
+//! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`.
+//!
+//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
+//! split has no associated overhead and enforces all invariants at the type
+//! level.
+
+use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::net::UnixStream;
+
+use std::error::Error;
+use std::net::Shutdown;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, io};
+
+/// Owned read half of a [`UnixStream`], created by [`into_split`].
+///
+/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
+/// on the [`AsyncReadExt`] trait. Examples import this trait through [the prelude].
+///
+/// [`UnixStream`]: crate::net::UnixStream
+/// [`into_split`]: crate::net::UnixStream::into_split()
+/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
+/// [the prelude]: crate::prelude
+#[derive(Debug)]
+pub struct OwnedReadHalf {
+ inner: Arc<UnixStream>,
+}
+
+/// Owned write half of a [`UnixStream`], created by [`into_split`].
+///
+/// Note that in the [`AsyncWrite`] implementation of this type,
+/// [`poll_shutdown`] will shut down the stream in the write direction.
+/// Dropping the write half will also shut down the write half of the stream.
+///
+/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods
+/// found on the [`AsyncWriteExt`] trait. Examples import this trait through
+/// [the prelude].
+///
+/// [`UnixStream`]: crate::net::UnixStream
+/// [`into_split`]: crate::net::UnixStream::into_split()
+/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
+/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
+/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+/// [the prelude]: crate::prelude
+#[derive(Debug)]
+pub struct OwnedWriteHalf {
+ inner: Arc<UnixStream>,
+ shutdown_on_drop: bool,
+}
+
+pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) {
+ let arc = Arc::new(stream);
+ let read = OwnedReadHalf {
+ inner: Arc::clone(&arc),
+ };
+ let write = OwnedWriteHalf {
+ inner: arc,
+ shutdown_on_drop: true,
+ };
+ (read, write)
+}
+
+pub(crate) fn reunite(
+ read: OwnedReadHalf,
+ write: OwnedWriteHalf,
+) -> Result<UnixStream, ReuniteError> {
+ if Arc::ptr_eq(&read.inner, &write.inner) {
+ write.forget();
+ // This unwrap cannot fail as the api does not allow creating more than two Arcs,
+ // and we just dropped the other half.
+ Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite"))
+ } else {
+ Err(ReuniteError(read, write))
+ }
+}
+
+/// Error indicating that two halves were not from the same socket, and thus could
+/// not be reunited.
+#[derive(Debug)]
+pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf);
+
+impl fmt::Display for ReuniteError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "tried to reunite halves that are not from the same socket"
+ )
+ }
+}
+
+impl Error for ReuniteError {}
+
+impl OwnedReadHalf {
+ /// Attempts to put the two halves of a `UnixStream` back together and
+ /// recover the original socket. Succeeds only if the two halves
+ /// originated from the same call to [`into_split`].
+ ///
+ /// [`into_split`]: crate::net::UnixStream::into_split()
+ pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> {
+ reunite(self, other)
+ }
+}
+
+impl AsyncRead for OwnedReadHalf {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.inner.poll_read_priv(cx, buf)
+ }
+}
+
+impl OwnedWriteHalf {
+ /// Attempts to put the two halves of a `UnixStream` back together and
+ /// recover the original socket. Succeeds only if the two halves
+ /// originated from the same call to [`into_split`].
+ ///
+ /// [`into_split`]: crate::net::UnixStream::into_split()
+ pub fn reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError> {
+ reunite(other, self)
+ }
+
+ /// Destroy the write half, but don't close the write half of the stream
+ /// until the read half is dropped. If the read half has already been
+ /// dropped, this closes the stream.
+ pub fn forget(mut self) {
+ self.shutdown_on_drop = false;
+ drop(self);
+ }
+}
+
+impl Drop for OwnedWriteHalf {
+ fn drop(&mut self) {
+ if self.shutdown_on_drop {
+ let _ = self.inner.shutdown(Shutdown::Write);
+ }
+ }
+}
+
+impl AsyncWrite for OwnedWriteHalf {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner.poll_write_priv(cx, buf)
+ }
+
+ #[inline]
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ // flush is a no-op
+ Poll::Ready(Ok(()))
+ }
+
+ // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
+ fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let res = self.inner.shutdown(Shutdown::Write);
+ if res.is_ok() {
+ Pin::into_inner(self).shutdown_on_drop = false;
+ }
+ res.into()
+ }
+}
+
+impl AsRef<UnixStream> for OwnedReadHalf {
+ fn as_ref(&self) -> &UnixStream {
+ &*self.inner
+ }
+}
+
+impl AsRef<UnixStream> for OwnedWriteHalf {
+ fn as_ref(&self) -> &UnixStream {
+ &*self.inner
+ }
+}
diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs
index beae699..5138077 100644
--- a/src/net/unix/stream.rs
+++ b/src/net/unix/stream.rs
@@ -1,27 +1,28 @@
use crate::future::poll_fn;
-use crate::io::{AsyncRead, AsyncWrite, PollEvented};
+use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
use crate::net::unix::split::{split, ReadHalf, WriteHalf};
+use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::unix::ucred::{self, UCred};
+use crate::net::unix::SocketAddr;
use std::convert::TryFrom;
use std::fmt;
use std::io::{self, Read, Write};
-use std::mem::MaybeUninit;
use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::{self, SocketAddr};
+use std::os::unix::net;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
-cfg_uds! {
+cfg_net_unix! {
/// A structure representing a connected Unix socket.
///
/// This socket can be connected directly with `UnixStream::connect` or accepted
/// from a listener with `UnixListener::incoming`. Additionally, a pair of
/// anonymous Unix sockets can be created with `UnixStream::pair`.
pub struct UnixStream {
- io: PollEvented<mio_uds::UnixStream>,
+ io: PollEvented<mio::net::UnixStream>,
}
}
@@ -35,7 +36,7 @@ impl UnixStream {
where
P: AsRef<Path>,
{
- let stream = mio_uds::UnixStream::connect(path)?;
+ let stream = mio::net::UnixStream::connect(path)?;
let stream = UnixStream::new(stream)?;
poll_fn(|cx| stream.io.poll_write_ready(cx)).await?;
@@ -54,9 +55,9 @@ impl UnixStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
- let stream = mio_uds::UnixStream::from_stream(stream)?;
+ let stream = mio::net::UnixStream::from_std(stream);
let io = PollEvented::new(stream)?;
Ok(UnixStream { io })
@@ -68,26 +69,26 @@ impl UnixStream {
/// communicating back and forth between one another. Each socket will
/// be associated with the default event loop's handle.
pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
- let (a, b) = mio_uds::UnixStream::pair()?;
+ let (a, b) = mio::net::UnixStream::pair()?;
let a = UnixStream::new(a)?;
let b = UnixStream::new(b)?;
Ok((a, b))
}
- pub(crate) fn new(stream: mio_uds::UnixStream) -> io::Result<UnixStream> {
+ pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> {
let io = PollEvented::new(stream)?;
Ok(UnixStream { io })
}
/// Returns the socket address of the local half of this connection.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
- self.io.get_ref().local_addr()
+ self.io.get_ref().local_addr().map(SocketAddr)
}
/// Returns the socket address of the remote half of this connection.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
- self.io.get_ref().peer_addr()
+ self.io.get_ref().peer_addr().map(SocketAddr)
}
/// Returns effective credentials of the process which called `connect` or `pair`.
@@ -109,24 +110,33 @@ impl UnixStream {
self.io.get_ref().shutdown(how)
}
+ // These lifetime markers also appear in the generated documentation, and make
+ // it more clear that this is a *borrowed* split.
+ #[allow(clippy::needless_lifetimes)]
/// Split a `UnixStream` into a read half and a write half, which can be used
/// to read and write the stream concurrently.
- pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) {
+ ///
+ /// This method is more efficient than [`into_split`], but the halves cannot be
+ /// moved into independently spawned tasks.
+ ///
+ /// [`into_split`]: Self::into_split()
+ pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
split(self)
}
-}
-impl TryFrom<UnixStream> for mio_uds::UnixStream {
- type Error = io::Error;
-
- /// Consumes value, returning the mio I/O object.
+ /// Splits a `UnixStream` into a read half and a write half, which can be used
+ /// to read and write the stream concurrently.
+ ///
+ /// Unlike [`split`], the owned halves can be moved to separate tasks, however
+ /// this comes at the cost of a heap allocation.
///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
+ /// **Note:** Dropping the write half will shut down the write half of the
+ /// stream. This is equivalent to calling [`shutdown(Write)`] on the `UnixStream`.
///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
- fn try_from(value: UnixStream) -> Result<Self, Self::Error> {
- value.io.into_inner()
+ /// [`split`]: Self::split()
+ /// [`shutdown(Write)`]: fn@Self::shutdown
+ pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
+ split_owned(self)
}
}
@@ -143,15 +153,11 @@ impl TryFrom<net::UnixStream> for UnixStream {
}
impl AsyncRead for UnixStream {
- unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
- false
- }
-
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
self.poll_read_priv(cx, buf)
}
}
@@ -190,16 +196,30 @@ impl UnixStream {
pub(crate) fn poll_read_priv(
&self,
cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().read(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
+
+ // Safety: `UnixStream::read` will not peek at the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+ match self.io.get_ref().read(b) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Ok(n) => {
+ // Safety: We trust `UnixStream::read` to have filled up `n` bytes
+ // in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.advance(n);
+ return Poll::Ready(Ok(()));
+ }
+ Err(e) => return Poll::Ready(Err(e)),
}
- x => Poll::Ready(x),
}
}
@@ -208,14 +228,15 @@ impl UnixStream {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().write(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
+ loop {
+ let ev = ready!(self.io.poll_write_ready(cx))?;
+
+ match self.io.get_ref().write(buf) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ x => return Poll::Ready(x),
}
- x => Poll::Ready(x),
}
}
}
diff --git a/src/net/unix/ucred.rs b/src/net/unix/ucred.rs
index 466aedc..ef214a7 100644
--- a/src/net/unix/ucred.rs
+++ b/src/net/unix/ucred.rs
@@ -4,9 +4,21 @@ use libc::{gid_t, uid_t};
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub struct UCred {
/// UID (user ID) of the process
- pub uid: uid_t,
+ uid: uid_t,
/// GID (group ID) of the process
- pub gid: gid_t,
+ gid: gid_t,
+}
+
+impl UCred {
+ /// Gets UID (user ID) of the process.
+ pub fn uid(&self) -> uid_t {
+ self.uid
+ }
+
+ /// Gets GID (group ID) of the process.
+ pub fn gid(&self) -> gid_t {
+ self.gid
+ }
}
#[cfg(any(target_os = "linux", target_os = "android"))]