aboutsummaryrefslogtreecommitdiff
path: root/src/net/unix/listener.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/unix/listener.rs')
-rw-r--r--src/net/unix/listener.rs143
1 files changed, 43 insertions, 100 deletions
diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs
index 9b76cb0..b272645 100644
--- a/src/net/unix/listener.rs
+++ b/src/net/unix/listener.rs
@@ -1,17 +1,15 @@
-use crate::future::poll_fn;
use crate::io::PollEvented;
-use crate::net::unix::{Incoming, UnixStream};
+use crate::net::unix::{SocketAddr, UnixStream};
-use mio::Ready;
use std::convert::TryFrom;
use std::fmt;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::{self, SocketAddr};
+use std::os::unix::net;
use std::path::Path;
use std::task::{Context, Poll};
-cfg_uds! {
+cfg_net_unix! {
/// A Unix socket which can accept connections from other Unix sockets.
///
/// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method. Alternatively `UnixListener`
@@ -47,7 +45,7 @@ cfg_uds! {
/// }
/// ```
pub struct UnixListener {
- io: PollEvented<mio_uds::UnixListener>,
+ io: PollEvented<mio::net::UnixListener>,
}
}
@@ -60,12 +58,12 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn bind<P>(path: P) -> io::Result<UnixListener>
where
P: AsRef<Path>,
{
- let listener = mio_uds::UnixListener::bind(path)?;
+ let listener = mio::net::UnixListener::bind(path)?;
let io = PollEvented::new(listener)?;
Ok(UnixListener { io })
}
@@ -82,16 +80,16 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> {
- let listener = mio_uds::UnixListener::from_listener(listener)?;
+ let listener = mio::net::UnixListener::from_std(listener);
let io = PollEvented::new(listener)?;
Ok(UnixListener { io })
}
/// Returns the local socket address of this listener.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
- self.io.get_ref().local_addr()
+ self.io.get_ref().local_addr().map(SocketAddr)
}
/// Returns the value of the `SO_ERROR` option.
@@ -100,117 +98,62 @@ impl UnixListener {
}
/// Accepts a new incoming connection to this listener.
- pub async fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> {
- poll_fn(|cx| self.poll_accept(cx)).await
+ pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
+ let (mio, addr) = self
+ .io
+ .async_io(mio::Interest::READABLE, |sock| sock.accept())
+ .await?;
+
+ let addr = SocketAddr(addr);
+ let stream = UnixStream::new(mio)?;
+ Ok((stream, addr))
}
- pub(crate) fn poll_accept(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
- let (io, addr) = ready!(self.poll_accept_std(cx))?;
-
- let io = mio_uds::UnixStream::from_stream(io)?;
- Ok((UnixStream::new(io)?, addr)).into()
- }
-
- fn poll_accept_std(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<(net::UnixStream, SocketAddr)>> {
- ready!(self.io.poll_read_ready(cx, Ready::readable()))?;
-
- match self.io.get_ref().accept_std() {
- Ok(None) => {
- self.io.clear_read_ready(cx, Ready::readable())?;
- Poll::Pending
- }
- Ok(Some((sock, addr))) => Ok((sock, addr)).into(),
- Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, Ready::readable())?;
- Poll::Pending
+ /// Polls to accept a new incoming connection to this listener.
+ ///
+ /// If there is no connection to accept, `Poll::Pending` is returned and
+ /// the current task will be notified by a waker.
+ ///
+ /// When ready, the most recent task that called `poll_accept` is notified.
+ /// The caller is responsble to ensure that `poll_accept` is called from a
+ /// single task. Failing to do this could result in tasks hanging.
+ pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
+
+ match self.io.get_ref().accept() {
+ Ok((sock, addr)) => {
+ let addr = SocketAddr(addr);
+ let sock = UnixStream::new(sock)?;
+ return Poll::Ready(Ok((sock, addr)));
+ }
+ Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(err) => return Err(err).into(),
}
- Err(err) => Err(err).into(),
}
}
-
- /// Returns a stream over the connections being received on this listener.
- ///
- /// Note that `UnixListener` also directly implements `Stream`.
- ///
- /// The returned stream will never return `None` and will also not yield the
- /// peer's `SocketAddr` structure. Iterating over it is equivalent to
- /// calling accept in a loop.
- ///
- /// # Errors
- ///
- /// Note that accepting a connection can lead to various errors and not all
- /// of them are necessarily fatal ‒ for example having too many open file
- /// descriptors or the other side closing the connection while it waits in
- /// an accept queue. These would terminate the stream if not handled in any
- /// way.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::UnixListener;
- /// use tokio::stream::StreamExt;
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap();
- /// let mut incoming = listener.incoming();
- ///
- /// while let Some(stream) = incoming.next().await {
- /// match stream {
- /// Ok(stream) => {
- /// println!("new client!");
- /// }
- /// Err(e) => { /* connection failed */ }
- /// }
- /// }
- /// }
- /// ```
- pub fn incoming(&mut self) -> Incoming<'_> {
- Incoming::new(self)
- }
}
#[cfg(feature = "stream")]
impl crate::stream::Stream for UnixListener {
type Item = io::Result<UnixStream>;
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
+ fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(self.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket)))
}
}
-impl TryFrom<UnixListener> for mio_uds::UnixListener {
- type Error = io::Error;
-
- /// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
- fn try_from(value: UnixListener) -> Result<Self, Self::Error> {
- value.io.into_inner()
- }
-}
-
-impl TryFrom<net::UnixListener> for UnixListener {
+impl TryFrom<std::os::unix::net::UnixListener> for UnixListener {
type Error = io::Error;
/// Consumes stream, returning the tokio I/O object.
///
/// This is equivalent to
/// [`UnixListener::from_std(stream)`](UnixListener::from_std).
- fn try_from(stream: net::UnixListener) -> io::Result<Self> {
+ fn try_from(stream: std::os::unix::net::UnixListener) -> io::Result<Self> {
Self::from_std(stream)
}
}