aboutsummaryrefslogtreecommitdiff
path: root/src/net/tcp/listener.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/tcp/listener.rs')
-rw-r--r--src/net/tcp/listener.rs192
1 files changed, 50 insertions, 142 deletions
diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs
index fd79b25..3f9bca0 100644
--- a/src/net/tcp/listener.rs
+++ b/src/net/tcp/listener.rs
@@ -1,7 +1,6 @@
-use crate::future::poll_fn;
use crate::io::PollEvented;
-use crate::net::tcp::{Incoming, TcpStream};
-use crate::net::ToSocketAddrs;
+use crate::net::tcp::TcpStream;
+use crate::net::{to_socket_addrs, ToSocketAddrs};
use std::convert::TryFrom;
use std::fmt;
@@ -9,7 +8,7 @@ use std::io;
use std::net::{self, SocketAddr};
use std::task::{Context, Poll};
-cfg_tcp! {
+cfg_net! {
/// A TCP socket server, listening for connections.
///
/// You can accept a new connection by using the [`accept`](`TcpListener::accept`) method. Alternatively `TcpListener`
@@ -40,7 +39,7 @@ cfg_tcp! {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
- /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+ /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
@@ -81,7 +80,7 @@ impl TcpListener {
/// method.
///
/// The address type can be any implementor of the [`ToSocketAddrs`] trait.
- /// Note that strings only implement this trait when the **`dns`** feature
+ /// Note that strings only implement this trait when the **`net`** feature
/// is enabled, as strings may contain domain names that need to be resolved.
///
/// If `addr` yields multiple addresses, bind will be attempted with each of
@@ -110,27 +109,8 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
- ///
- /// Without the `dns` feature:
- ///
- /// ```no_run
- /// use tokio::net::TcpListener;
- /// use std::net::Ipv4Addr;
- ///
- /// use std::io;
- ///
- /// #[tokio::main]
- /// async fn main() -> io::Result<()> {
- /// let listener = TcpListener::bind((Ipv4Addr::new(127, 0, 0, 1), 2345)).await?;
- ///
- /// // use the listener
- ///
- /// # let _ = listener;
- /// Ok(())
- /// }
- /// ```
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
- let addrs = addr.to_socket_addrs().await?;
+ let addrs = to_socket_addrs(addr).await?;
let mut last_err = None;
@@ -150,7 +130,7 @@ impl TcpListener {
}
fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
- let listener = mio::net::TcpListener::bind(&addr)?;
+ let listener = mio::net::TcpListener::bind(addr)?;
TcpListener::new(listener)
}
@@ -171,7 +151,7 @@ impl TcpListener {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
- /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+ /// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// match listener.accept().await {
/// Ok((_socket, addr)) => println!("new client: {:?}", addr),
@@ -181,66 +161,53 @@ impl TcpListener {
/// Ok(())
/// }
/// ```
- pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
- poll_fn(|cx| self.poll_accept(cx)).await
+ pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
+ let (mio, addr) = self
+ .io
+ .async_io(mio::Interest::READABLE, |sock| sock.accept())
+ .await?;
+
+ let stream = TcpStream::new(mio)?;
+ Ok((stream, addr))
}
- /// Attempts to poll `SocketAddr` and `TcpStream` bound to this address.
+ /// Polls to accept a new incoming connection to this listener.
///
- /// In case if I/O resource isn't ready yet, `Poll::Pending` is returned and
+ /// If there is no connection to accept, `Poll::Pending` is returned and the
/// current task will be notified by a waker.
- pub fn poll_accept(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
- let (io, addr) = ready!(self.poll_accept_std(cx))?;
-
- let io = mio::net::TcpStream::from_stream(io)?;
- let io = TcpStream::new(io)?;
-
- Poll::Ready(Ok((io, addr)))
- }
-
- fn poll_accept_std(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<io::Result<(net::TcpStream, SocketAddr)>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
+ ///
+ /// When ready, the most recent task that called `poll_accept` is notified.
+ /// The caller is responsble to ensure that `poll_accept` is called from a
+ /// single task. Failing to do this could result in tasks hanging.
+ pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
- match self.io.get_ref().accept_std() {
- Ok(pair) => Poll::Ready(Ok(pair)),
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
+ match self.io.get_ref().accept() {
+ Ok((io, addr)) => {
+ let io = TcpStream::new(io)?;
+ return Poll::Ready(Ok((io, addr)));
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
/// Creates a new TCP listener from the standard library's TCP listener.
///
- /// This method can be used when the `Handle::tcp_listen` method isn't
- /// sufficient because perhaps some more configuration is needed in terms of
- /// before the calls to `bind` and `listen`.
+ /// This function is intended to be used to wrap a TCP listener from the
+ /// standard library in the Tokio equivalent. The conversion assumes nothing
+ /// about the underlying listener; it is left up to the user to set it in
+ /// non-blocking mode.
///
/// This API is typically paired with the `net2` crate and the `TcpBuilder`
/// type to build up and customize a listener before it's shipped off to the
/// backing event loop. This allows configuration of options like
/// `SO_REUSEPORT`, binding to multiple addresses, etc.
///
- /// The `addr` argument here is one of the addresses that `listener` is
- /// bound to and the listener will only be guaranteed to accept connections
- /// of the same address type currently.
- ///
- /// The platform specific behavior of this function looks like:
- ///
- /// * On Unix, the socket is placed into nonblocking mode and connections
- /// can be accepted as normal
- ///
- /// * On Windows, the address is stored internally and all future accepts
- /// will only be for the same IP version as `addr` specified. That is, if
- /// `addr` is an IPv4 address then all sockets accepted will be IPv4 as
- /// well (same for IPv6).
///
/// # Examples
///
@@ -262,14 +229,14 @@ impl TcpListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
- let io = mio::net::TcpListener::from_std(listener)?;
+ let io = mio::net::TcpListener::from_std(listener);
let io = PollEvented::new(io)?;
Ok(TcpListener { io })
}
- fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
+ pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
let io = PollEvented::new(listener)?;
Ok(TcpListener { io })
}
@@ -301,46 +268,6 @@ impl TcpListener {
self.io.get_ref().local_addr()
}
- /// Returns a stream over the connections being received on this listener.
- ///
- /// Note that `TcpListener` also directly implements `Stream`.
- ///
- /// The returned stream will never return `None` and will also not yield the
- /// peer's `SocketAddr` structure. Iterating over it is equivalent to
- /// calling accept in a loop.
- ///
- /// # Errors
- ///
- /// Note that accepting a connection can lead to various errors and not all
- /// of them are necessarily fatal ‒ for example having too many open file
- /// descriptors or the other side closing the connection while it waits in
- /// an accept queue. These would terminate the stream if not handled in any
- /// way.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::{net::TcpListener, stream::StreamExt};
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
- /// let mut incoming = listener.incoming();
- ///
- /// while let Some(stream) = incoming.next().await {
- /// match stream {
- /// Ok(stream) => {
- /// println!("new client!");
- /// }
- /// Err(e) => { /* connection failed */ }
- /// }
- /// }
- /// }
- /// ```
- pub fn incoming(&mut self) -> Incoming<'_> {
- Incoming::new(self)
- }
-
/// Gets the value of the `IP_TTL` option for this socket.
///
/// For more information about this option, see [`set_ttl`].
@@ -398,29 +325,12 @@ impl TcpListener {
impl crate::stream::Stream for TcpListener {
type Item = io::Result<TcpStream>;
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
+ fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(self.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket)))
}
}
-impl TryFrom<TcpListener> for mio::net::TcpListener {
- type Error = io::Error;
-
- /// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
- fn try_from(value: TcpListener) -> Result<Self, Self::Error> {
- value.io.into_inner()
- }
-}
-
impl TryFrom<net::TcpListener> for TcpListener {
type Error = io::Error;
@@ -453,14 +363,12 @@ mod sys {
#[cfg(windows)]
mod sys {
- // TODO: let's land these upstream with mio and then we can add them here.
- //
- // use std::os::windows::prelude::*;
- // use super::{TcpListener;
- //
- // impl AsRawHandle for TcpListener {
- // fn as_raw_handle(&self) -> RawHandle {
- // self.listener.io().as_raw_handle()
- // }
- // }
+ use super::TcpListener;
+ use std::os::windows::prelude::*;
+
+ impl AsRawSocket for TcpListener {
+ fn as_raw_socket(&self) -> RawSocket {
+ self.io.get_ref().as_raw_socket()
+ }
+ }
}