aboutsummaryrefslogtreecommitdiff
path: root/src/net/unix/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/unix/stream.rs')
-rw-r--r--src/net/unix/stream.rs111
1 files changed, 66 insertions, 45 deletions
diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs
index beae699..5138077 100644
--- a/src/net/unix/stream.rs
+++ b/src/net/unix/stream.rs
@@ -1,27 +1,28 @@
use crate::future::poll_fn;
-use crate::io::{AsyncRead, AsyncWrite, PollEvented};
+use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
use crate::net::unix::split::{split, ReadHalf, WriteHalf};
+use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::unix::ucred::{self, UCred};
+use crate::net::unix::SocketAddr;
use std::convert::TryFrom;
use std::fmt;
use std::io::{self, Read, Write};
-use std::mem::MaybeUninit;
use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::{self, SocketAddr};
+use std::os::unix::net;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
-cfg_uds! {
+cfg_net_unix! {
/// A structure representing a connected Unix socket.
///
/// This socket can be connected directly with `UnixStream::connect` or accepted
/// from a listener with `UnixListener::incoming`. Additionally, a pair of
/// anonymous Unix sockets can be created with `UnixStream::pair`.
pub struct UnixStream {
- io: PollEvented<mio_uds::UnixStream>,
+ io: PollEvented<mio::net::UnixStream>,
}
}
@@ -35,7 +36,7 @@ impl UnixStream {
where
P: AsRef<Path>,
{
- let stream = mio_uds::UnixStream::connect(path)?;
+ let stream = mio::net::UnixStream::connect(path)?;
let stream = UnixStream::new(stream)?;
poll_fn(|cx| stream.io.poll_write_ready(cx)).await?;
@@ -54,9 +55,9 @@ impl UnixStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
- let stream = mio_uds::UnixStream::from_stream(stream)?;
+ let stream = mio::net::UnixStream::from_std(stream);
let io = PollEvented::new(stream)?;
Ok(UnixStream { io })
@@ -68,26 +69,26 @@ impl UnixStream {
/// communicating back and forth between one another. Each socket will
/// be associated with the default event loop's handle.
pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
- let (a, b) = mio_uds::UnixStream::pair()?;
+ let (a, b) = mio::net::UnixStream::pair()?;
let a = UnixStream::new(a)?;
let b = UnixStream::new(b)?;
Ok((a, b))
}
- pub(crate) fn new(stream: mio_uds::UnixStream) -> io::Result<UnixStream> {
+ pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> {
let io = PollEvented::new(stream)?;
Ok(UnixStream { io })
}
/// Returns the socket address of the local half of this connection.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
- self.io.get_ref().local_addr()
+ self.io.get_ref().local_addr().map(SocketAddr)
}
/// Returns the socket address of the remote half of this connection.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
- self.io.get_ref().peer_addr()
+ self.io.get_ref().peer_addr().map(SocketAddr)
}
/// Returns effective credentials of the process which called `connect` or `pair`.
@@ -109,24 +110,33 @@ impl UnixStream {
self.io.get_ref().shutdown(how)
}
+ // These lifetime markers also appear in the generated documentation, and make
+ // it more clear that this is a *borrowed* split.
+ #[allow(clippy::needless_lifetimes)]
/// Split a `UnixStream` into a read half and a write half, which can be used
/// to read and write the stream concurrently.
- pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) {
+ ///
+ /// This method is more efficient than [`into_split`], but the halves cannot be
+ /// moved into independently spawned tasks.
+ ///
+ /// [`into_split`]: Self::into_split()
+ pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
split(self)
}
-}
-impl TryFrom<UnixStream> for mio_uds::UnixStream {
- type Error = io::Error;
-
- /// Consumes value, returning the mio I/O object.
+ /// Splits a `UnixStream` into a read half and a write half, which can be used
+ /// to read and write the stream concurrently.
+ ///
+ /// Unlike [`split`], the owned halves can be moved to separate tasks, however
+ /// this comes at the cost of a heap allocation.
///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
+ /// **Note:** Dropping the write half will shut down the write half of the
+ /// stream. This is equivalent to calling [`shutdown(Write)`] on the `UnixStream`.
///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
- fn try_from(value: UnixStream) -> Result<Self, Self::Error> {
- value.io.into_inner()
+ /// [`split`]: Self::split()
+ /// [`shutdown(Write)`]: fn@Self::shutdown
+ pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
+ split_owned(self)
}
}
@@ -143,15 +153,11 @@ impl TryFrom<net::UnixStream> for UnixStream {
}
impl AsyncRead for UnixStream {
- unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
- false
- }
-
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
self.poll_read_priv(cx, buf)
}
}
@@ -190,16 +196,30 @@ impl UnixStream {
pub(crate) fn poll_read_priv(
&self,
cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().read(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
+
+ // Safety: `UnixStream::read` will not peek at the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+ match self.io.get_ref().read(b) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Ok(n) => {
+ // Safety: We trust `UnixStream::read` to have filled up `n` bytes
+ // in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.advance(n);
+ return Poll::Ready(Ok(()));
+ }
+ Err(e) => return Poll::Ready(Err(e)),
}
- x => Poll::Ready(x),
}
}
@@ -208,14 +228,15 @@ impl UnixStream {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().write(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
+ loop {
+ let ev = ready!(self.io.poll_write_ready(cx))?;
+
+ match self.io.get_ref().write(buf) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ x => return Poll::Ready(x),
}
- x => Poll::Ready(x),
}
}
}