diff options
Diffstat (limited to 'src/net/tcp/listener.rs')
-rw-r--r-- | src/net/tcp/listener.rs | 192 |
1 files changed, 50 insertions, 142 deletions
diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index fd79b25..3f9bca0 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -1,7 +1,6 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::tcp::{Incoming, TcpStream}; -use crate::net::ToSocketAddrs; +use crate::net::tcp::TcpStream; +use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; use std::fmt; @@ -9,7 +8,7 @@ use std::io; use std::net::{self, SocketAddr}; use std::task::{Context, Poll}; -cfg_tcp! { +cfg_net! { /// A TCP socket server, listening for connections. /// /// You can accept a new connection by using the [`accept`](`TcpListener::accept`) method. Alternatively `TcpListener` @@ -40,7 +39,7 @@ cfg_tcp! { /// /// #[tokio::main] /// async fn main() -> io::Result<()> { - /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// /// loop { /// let (socket, _) = listener.accept().await?; @@ -81,7 +80,7 @@ impl TcpListener { /// method. /// /// The address type can be any implementor of the [`ToSocketAddrs`] trait. - /// Note that strings only implement this trait when the **`dns`** feature + /// Note that strings only implement this trait when the **`net`** feature /// is enabled, as strings may contain domain names that need to be resolved. /// /// If `addr` yields multiple addresses, bind will be attempted with each of @@ -110,27 +109,8 @@ impl TcpListener { /// Ok(()) /// } /// ``` - /// - /// Without the `dns` feature: - /// - /// ```no_run - /// use tokio::net::TcpListener; - /// use std::net::Ipv4Addr; - /// - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let listener = TcpListener::bind((Ipv4Addr::new(127, 0, 0, 1), 2345)).await?; - /// - /// // use the listener - /// - /// # let _ = listener; - /// Ok(()) - /// } - /// ``` pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> { - let addrs = addr.to_socket_addrs().await?; + let addrs = to_socket_addrs(addr).await?; let mut last_err = None; @@ -150,7 +130,7 @@ impl TcpListener { } fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> { - let listener = mio::net::TcpListener::bind(&addr)?; + let listener = mio::net::TcpListener::bind(addr)?; TcpListener::new(listener) } @@ -171,7 +151,7 @@ impl TcpListener { /// /// #[tokio::main] /// async fn main() -> io::Result<()> { - /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; /// /// match listener.accept().await { /// Ok((_socket, addr)) => println!("new client: {:?}", addr), @@ -181,66 +161,53 @@ impl TcpListener { /// Ok(()) /// } /// ``` - pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { - poll_fn(|cx| self.poll_accept(cx)).await + pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { + let (mio, addr) = self + .io + .async_io(mio::Interest::READABLE, |sock| sock.accept()) + .await?; + + let stream = TcpStream::new(mio)?; + Ok((stream, addr)) } - /// Attempts to poll `SocketAddr` and `TcpStream` bound to this address. + /// Polls to accept a new incoming connection to this listener. /// - /// In case if I/O resource isn't ready yet, `Poll::Pending` is returned and + /// If there is no connection to accept, `Poll::Pending` is returned and the /// current task will be notified by a waker. - pub fn poll_accept( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<io::Result<(TcpStream, SocketAddr)>> { - let (io, addr) = ready!(self.poll_accept_std(cx))?; - - let io = mio::net::TcpStream::from_stream(io)?; - let io = TcpStream::new(io)?; - - Poll::Ready(Ok((io, addr))) - } - - fn poll_accept_std( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<io::Result<(net::TcpStream, SocketAddr)>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + /// + /// When ready, the most recent task that called `poll_accept` is notified. + /// The caller is responsble to ensure that `poll_accept` is called from a + /// single task. Failing to do this could result in tasks hanging. + pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> { + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().accept_std() { - Ok(pair) => Poll::Ready(Ok(pair)), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending + match self.io.get_ref().accept() { + Ok((io, addr)) => { + let io = TcpStream::new(io)?; + return Poll::Ready(Ok((io, addr))); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } /// Creates a new TCP listener from the standard library's TCP listener. /// - /// This method can be used when the `Handle::tcp_listen` method isn't - /// sufficient because perhaps some more configuration is needed in terms of - /// before the calls to `bind` and `listen`. + /// This function is intended to be used to wrap a TCP listener from the + /// standard library in the Tokio equivalent. The conversion assumes nothing + /// about the underlying listener; it is left up to the user to set it in + /// non-blocking mode. /// /// This API is typically paired with the `net2` crate and the `TcpBuilder` /// type to build up and customize a listener before it's shipped off to the /// backing event loop. This allows configuration of options like /// `SO_REUSEPORT`, binding to multiple addresses, etc. /// - /// The `addr` argument here is one of the addresses that `listener` is - /// bound to and the listener will only be guaranteed to accept connections - /// of the same address type currently. - /// - /// The platform specific behavior of this function looks like: - /// - /// * On Unix, the socket is placed into nonblocking mode and connections - /// can be accepted as normal - /// - /// * On Windows, the address is stored internally and all future accepts - /// will only be for the same IP version as `addr` specified. That is, if - /// `addr` is an IPv4 address then all sockets accepted will be IPv4 as - /// well (same for IPv6). /// /// # Examples /// @@ -262,14 +229,14 @@ impl TcpListener { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> { - let io = mio::net::TcpListener::from_std(listener)?; + let io = mio::net::TcpListener::from_std(listener); let io = PollEvented::new(io)?; Ok(TcpListener { io }) } - fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> { + pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> { let io = PollEvented::new(listener)?; Ok(TcpListener { io }) } @@ -301,46 +268,6 @@ impl TcpListener { self.io.get_ref().local_addr() } - /// Returns a stream over the connections being received on this listener. - /// - /// Note that `TcpListener` also directly implements `Stream`. - /// - /// The returned stream will never return `None` and will also not yield the - /// peer's `SocketAddr` structure. Iterating over it is equivalent to - /// calling accept in a loop. - /// - /// # Errors - /// - /// Note that accepting a connection can lead to various errors and not all - /// of them are necessarily fatal ‒ for example having too many open file - /// descriptors or the other side closing the connection while it waits in - /// an accept queue. These would terminate the stream if not handled in any - /// way. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::{net::TcpListener, stream::StreamExt}; - /// - /// #[tokio::main] - /// async fn main() { - /// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); - /// let mut incoming = listener.incoming(); - /// - /// while let Some(stream) = incoming.next().await { - /// match stream { - /// Ok(stream) => { - /// println!("new client!"); - /// } - /// Err(e) => { /* connection failed */ } - /// } - /// } - /// } - /// ``` - pub fn incoming(&mut self) -> Incoming<'_> { - Incoming::new(self) - } - /// Gets the value of the `IP_TTL` option for this socket. /// /// For more information about this option, see [`set_ttl`]. @@ -398,29 +325,12 @@ impl TcpListener { impl crate::stream::Stream for TcpListener { type Item = io::Result<TcpStream>; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { + fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let (socket, _) = ready!(self.poll_accept(cx))?; Poll::Ready(Some(Ok(socket))) } } -impl TryFrom<TcpListener> for mio::net::TcpListener { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner - fn try_from(value: TcpListener) -> Result<Self, Self::Error> { - value.io.into_inner() - } -} - impl TryFrom<net::TcpListener> for TcpListener { type Error = io::Error; @@ -453,14 +363,12 @@ mod sys { #[cfg(windows)] mod sys { - // TODO: let's land these upstream with mio and then we can add them here. - // - // use std::os::windows::prelude::*; - // use super::{TcpListener; - // - // impl AsRawHandle for TcpListener { - // fn as_raw_handle(&self) -> RawHandle { - // self.listener.io().as_raw_handle() - // } - // } + use super::TcpListener; + use std::os::windows::prelude::*; + + impl AsRawSocket for TcpListener { + fn as_raw_socket(&self) -> RawSocket { + self.io.get_ref().as_raw_socket() + } + } } |