diff options
Diffstat (limited to 'src/net/unix/listener.rs')
-rw-r--r-- | src/net/unix/listener.rs | 143 |
1 files changed, 43 insertions, 100 deletions
diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs index 9b76cb0..b272645 100644 --- a/src/net/unix/listener.rs +++ b/src/net/unix/listener.rs @@ -1,17 +1,15 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::unix::{Incoming, UnixStream}; +use crate::net::unix::{SocketAddr, UnixStream}; -use mio::Ready; use std::convert::TryFrom; use std::fmt; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +use std::os::unix::net; use std::path::Path; use std::task::{Context, Poll}; -cfg_uds! { +cfg_net_unix! { /// A Unix socket which can accept connections from other Unix sockets. /// /// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method. Alternatively `UnixListener` @@ -47,7 +45,7 @@ cfg_uds! { /// } /// ``` pub struct UnixListener { - io: PollEvented<mio_uds::UnixListener>, + io: PollEvented<mio::net::UnixListener>, } } @@ -60,12 +58,12 @@ impl UnixListener { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn bind<P>(path: P) -> io::Result<UnixListener> where P: AsRef<Path>, { - let listener = mio_uds::UnixListener::bind(path)?; + let listener = mio::net::UnixListener::bind(path)?; let io = PollEvented::new(listener)?; Ok(UnixListener { io }) } @@ -82,16 +80,16 @@ impl UnixListener { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> { - let listener = mio_uds::UnixListener::from_listener(listener)?; + let listener = mio::net::UnixListener::from_std(listener); let io = PollEvented::new(listener)?; Ok(UnixListener { io }) } /// Returns the local socket address of this listener. pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr() + self.io.get_ref().local_addr().map(SocketAddr) } /// Returns the value of the `SO_ERROR` option. @@ -100,117 +98,62 @@ impl UnixListener { } /// Accepts a new incoming connection to this listener. - pub async fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> { - poll_fn(|cx| self.poll_accept(cx)).await + pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { + let (mio, addr) = self + .io + .async_io(mio::Interest::READABLE, |sock| sock.accept()) + .await?; + + let addr = SocketAddr(addr); + let stream = UnixStream::new(mio)?; + Ok((stream, addr)) } - pub(crate) fn poll_accept( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<io::Result<(UnixStream, SocketAddr)>> { - let (io, addr) = ready!(self.poll_accept_std(cx))?; - - let io = mio_uds::UnixStream::from_stream(io)?; - Ok((UnixStream::new(io)?, addr)).into() - } - - fn poll_accept_std( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<io::Result<(net::UnixStream, SocketAddr)>> { - ready!(self.io.poll_read_ready(cx, Ready::readable()))?; - - match self.io.get_ref().accept_std() { - Ok(None) => { - self.io.clear_read_ready(cx, Ready::readable())?; - Poll::Pending - } - Ok(Some((sock, addr))) => Ok((sock, addr)).into(), - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, Ready::readable())?; - Poll::Pending + /// Polls to accept a new incoming connection to this listener. + /// + /// If there is no connection to accept, `Poll::Pending` is returned and + /// the current task will be notified by a waker. + /// + /// When ready, the most recent task that called `poll_accept` is notified. + /// The caller is responsble to ensure that `poll_accept` is called from a + /// single task. Failing to do this could result in tasks hanging. + pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> { + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; + + match self.io.get_ref().accept() { + Ok((sock, addr)) => { + let addr = SocketAddr(addr); + let sock = UnixStream::new(sock)?; + return Poll::Ready(Ok((sock, addr))); + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(err) => return Err(err).into(), } - Err(err) => Err(err).into(), } } - - /// Returns a stream over the connections being received on this listener. - /// - /// Note that `UnixListener` also directly implements `Stream`. - /// - /// The returned stream will never return `None` and will also not yield the - /// peer's `SocketAddr` structure. Iterating over it is equivalent to - /// calling accept in a loop. - /// - /// # Errors - /// - /// Note that accepting a connection can lead to various errors and not all - /// of them are necessarily fatal ‒ for example having too many open file - /// descriptors or the other side closing the connection while it waits in - /// an accept queue. These would terminate the stream if not handled in any - /// way. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::UnixListener; - /// use tokio::stream::StreamExt; - /// - /// #[tokio::main] - /// async fn main() { - /// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap(); - /// let mut incoming = listener.incoming(); - /// - /// while let Some(stream) = incoming.next().await { - /// match stream { - /// Ok(stream) => { - /// println!("new client!"); - /// } - /// Err(e) => { /* connection failed */ } - /// } - /// } - /// } - /// ``` - pub fn incoming(&mut self) -> Incoming<'_> { - Incoming::new(self) - } } #[cfg(feature = "stream")] impl crate::stream::Stream for UnixListener { type Item = io::Result<UnixStream>; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let (socket, _) = ready!(self.poll_accept(cx))?; Poll::Ready(Some(Ok(socket))) } } -impl TryFrom<UnixListener> for mio_uds::UnixListener { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner - fn try_from(value: UnixListener) -> Result<Self, Self::Error> { - value.io.into_inner() - } -} - -impl TryFrom<net::UnixListener> for UnixListener { +impl TryFrom<std::os::unix::net::UnixListener> for UnixListener { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`UnixListener::from_std(stream)`](UnixListener::from_std). - fn try_from(stream: net::UnixListener) -> io::Result<Self> { + fn try_from(stream: std::os::unix::net::UnixListener) -> io::Result<Self> { Self::from_std(stream) } } |