aboutsummaryrefslogtreecommitdiff
path: root/src/net/tcp/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/tcp/stream.rs')
-rw-r--r--src/net/tcp/stream.rs327
1 files changed, 184 insertions, 143 deletions
diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs
index 60d20fd..b17d33f 100644
--- a/src/net/tcp/stream.rs
+++ b/src/net/tcp/stream.rs
@@ -1,8 +1,12 @@
-use crate::future::poll_fn;
+cfg_not_wasi! {
+ use crate::future::poll_fn;
+ use crate::net::{to_socket_addrs, ToSocketAddrs};
+ use std::time::Duration;
+}
+
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
-use crate::net::{to_socket_addrs, ToSocketAddrs};
use std::convert::TryFrom;
use std::fmt;
@@ -10,7 +14,6 @@ use std::io;
use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};
-use std::time::Duration;
cfg_io_util! {
use bytes::BufMut;
@@ -70,86 +73,88 @@ cfg_net! {
}
impl TcpStream {
- /// Opens a TCP connection to a remote host.
- ///
- /// `addr` is an address of the remote host. Anything which implements the
- /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
- /// yields multiple addresses, connect will be attempted with each of the
- /// addresses until a connection is successful. If none of the addresses
- /// result in a successful connection, the error returned from the last
- /// connection attempt (the last address) is returned.
- ///
- /// To configure the socket before connecting, you can use the [`TcpSocket`]
- /// type.
- ///
- /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
- /// [`TcpSocket`]: struct@crate::net::TcpSocket
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::TcpStream;
- /// use tokio::io::AsyncWriteExt;
- /// use std::error::Error;
- ///
- /// #[tokio::main]
- /// async fn main() -> Result<(), Box<dyn Error>> {
- /// // Connect to a peer
- /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
- ///
- /// // Write some data.
- /// stream.write_all(b"hello world!").await?;
- ///
- /// Ok(())
- /// }
- /// ```
- ///
- /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
- ///
- /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
- /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
- pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
- let addrs = to_socket_addrs(addr).await?;
+ cfg_not_wasi! {
+ /// Opens a TCP connection to a remote host.
+ ///
+ /// `addr` is an address of the remote host. Anything which implements the
+ /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
+ /// yields multiple addresses, connect will be attempted with each of the
+ /// addresses until a connection is successful. If none of the addresses
+ /// result in a successful connection, the error returned from the last
+ /// connection attempt (the last address) is returned.
+ ///
+ /// To configure the socket before connecting, you can use the [`TcpSocket`]
+ /// type.
+ ///
+ /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
+ /// [`TcpSocket`]: struct@crate::net::TcpSocket
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use tokio::io::AsyncWriteExt;
+ /// use std::error::Error;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// // Write some data.
+ /// stream.write_all(b"hello world!").await?;
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ ///
+ /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
+ ///
+ /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
+ /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+ pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
+ let addrs = to_socket_addrs(addr).await?;
- let mut last_err = None;
+ let mut last_err = None;
- for addr in addrs {
- match TcpStream::connect_addr(addr).await {
- Ok(stream) => return Ok(stream),
- Err(e) => last_err = Some(e),
+ for addr in addrs {
+ match TcpStream::connect_addr(addr).await {
+ Ok(stream) => return Ok(stream),
+ Err(e) => last_err = Some(e),
+ }
}
+
+ Err(last_err.unwrap_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "could not resolve to any address",
+ )
+ }))
}
- Err(last_err.unwrap_or_else(|| {
- io::Error::new(
- io::ErrorKind::InvalidInput,
- "could not resolve to any address",
- )
- }))
- }
+ /// Establishes a connection to the specified `addr`.
+ async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
+ let sys = mio::net::TcpStream::connect(addr)?;
+ TcpStream::connect_mio(sys).await
+ }
- /// Establishes a connection to the specified `addr`.
- async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
- let sys = mio::net::TcpStream::connect(addr)?;
- TcpStream::connect_mio(sys).await
- }
+ pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
+ let stream = TcpStream::new(sys)?;
- pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
- let stream = TcpStream::new(sys)?;
+ // Once we've connected, wait for the stream to be writable as
+ // that's when the actual connection has been initiated. Once we're
+ // writable we check for `take_socket_error` to see if the connect
+ // actually hit an error or not.
+ //
+ // If all that succeeded then we ship everything on up.
+ poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
- // Once we've connected, wait for the stream to be writable as
- // that's when the actual connection has been initiated. Once we're
- // writable we check for `take_socket_error` to see if the connect
- // actually hit an error or not.
- //
- // If all that succeeded then we ship everything on up.
- poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
+ if let Some(e) = stream.io.take_error()? {
+ return Err(e);
+ }
- if let Some(e) = stream.io.take_error()? {
- return Err(e);
+ Ok(stream)
}
-
- Ok(stream)
}
pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
@@ -160,9 +165,16 @@ impl TcpStream {
/// Creates new `TcpStream` from a `std::net::TcpStream`.
///
/// This function is intended to be used to wrap a TCP stream from the
- /// standard library in the Tokio equivalent. The conversion assumes nothing
- /// about the underlying stream; it is left up to the user to set it in
- /// non-blocking mode.
+ /// standard library in the Tokio equivalent.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the stream is in
+ /// non-blocking mode. Otherwise all I/O operations on the stream
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
///
/// # Examples
///
@@ -181,11 +193,13 @@ impl TcpStream {
///
/// # Panics
///
- /// This function panics if thread-local runtime is not set.
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ #[track_caller]
pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_std(stream);
let io = PollEvented::new(io)?;
@@ -244,6 +258,15 @@ impl TcpStream {
.map(|io| io.into_raw_socket())
.map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
}
+
+ #[cfg(tokio_wasi)]
+ {
+ use std::os::wasi::io::{FromRawFd, IntoRawFd};
+ self.io
+ .into_inner()
+ .map(|io| io.into_raw_fd())
+ .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
+ }
}
/// Returns the local address that this stream is bound to.
@@ -264,6 +287,11 @@ impl TcpStream {
self.io.local_addr()
}
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.take_error()
+ }
+
/// Returns the remote address that this stream is connected to.
///
/// # Examples
@@ -356,6 +384,12 @@ impl TcpStream {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
@@ -387,7 +421,7 @@ impl TcpStream {
/// // if the readiness event is a false positive.
/// match stream.try_read(&mut data) {
/// Ok(n) => {
- /// println!("read {} bytes", n);
+ /// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
@@ -526,8 +560,12 @@ impl TcpStream {
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
- /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
- /// and will no longer yield data. If the stream is not ready to read data
+ /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
+ ///
+ /// 1. The stream's read half is closed and will no longer yield data.
+ /// 2. The specified buffer was 0 bytes in length.
+ ///
+ /// If the stream is not ready to read data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
@@ -939,7 +977,7 @@ impl TcpStream {
/// Tries to read or write from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The closure
- /// should attempt to perform IO operation from the socket by manually
+ /// should attempt to perform IO operation on the socket by manually
/// calling the appropriate syscall. If the operation fails because the
/// socket is not actually ready, then the closure should return a
/// `WouldBlock` error and the readiness flag is cleared. The return value
@@ -958,6 +996,11 @@ impl TcpStream {
/// defined on the Tokio `TcpStream` type, as this will mess with the
/// readiness flag and can cause the socket to behave incorrectly.
///
+ /// This method is not intended to be used with combined interests.
+ /// The closure should perform only one type of IO operation, so it should not
+ /// require more than one ready state. This method may panic or sleep forever
+ /// if it is called with a combined interest.
+ ///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: TcpStream::readable()
@@ -968,7 +1011,9 @@ impl TcpStream {
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
- self.io.registration().try_io(interest, f)
+ self.io
+ .registration()
+ .try_io(interest, || self.io.try_io(f))
}
/// Receives data on the socket from the remote address to which it is
@@ -1070,69 +1115,53 @@ impl TcpStream {
self.io.set_nodelay(nodelay)
}
- /// Reads the linger duration for this socket by getting the `SO_LINGER`
- /// option.
- ///
- /// For more information about this option, see [`set_linger`].
- ///
- /// [`set_linger`]: TcpStream::set_linger
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::TcpStream;
- ///
- /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
- /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
- ///
- /// println!("{:?}", stream.linger()?);
- /// # Ok(())
- /// # }
- /// ```
- pub fn linger(&self) -> io::Result<Option<Duration>> {
- let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());
-
- mio_socket.get_linger()
- }
-
- /// Sets the linger duration of this socket by setting the SO_LINGER option.
- ///
- /// This option controls the action taken when a stream has unsent messages and the stream is
- /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
- /// data or until the time expires.
- ///
- /// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a
- /// way that allows the process to continue as quickly as possible.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::TcpStream;
- ///
- /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
- /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
- ///
- /// stream.set_linger(None)?;
- /// # Ok(())
- /// # }
- /// ```
- pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
- let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());
-
- mio_socket.set_linger(dur)
- }
-
- fn to_mio(&self) -> mio::net::TcpSocket {
- #[cfg(windows)]
- {
- use std::os::windows::io::{AsRawSocket, FromRawSocket};
- unsafe { mio::net::TcpSocket::from_raw_socket(self.as_raw_socket()) }
+ cfg_not_wasi! {
+ /// Reads the linger duration for this socket by getting the `SO_LINGER`
+ /// option.
+ ///
+ /// For more information about this option, see [`set_linger`].
+ ///
+ /// [`set_linger`]: TcpStream::set_linger
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// println!("{:?}", stream.linger()?);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn linger(&self) -> io::Result<Option<Duration>> {
+ socket2::SockRef::from(self).linger()
}
- #[cfg(unix)]
- {
- use std::os::unix::io::{AsRawFd, FromRawFd};
- unsafe { mio::net::TcpSocket::from_raw_fd(self.as_raw_fd()) }
+ /// Sets the linger duration of this socket by setting the SO_LINGER option.
+ ///
+ /// This option controls the action taken when a stream has unsent messages and the stream is
+ /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
+ /// data or until the time expires.
+ ///
+ /// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a
+ /// way that allows the process to continue as quickly as possible.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// stream.set_linger(None)?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
+ socket2::SockRef::from(self).set_linger(dur)
}
}
@@ -1326,3 +1355,15 @@ mod sys {
}
}
}
+
+#[cfg(all(tokio_unstable, tokio_wasi))]
+mod sys {
+ use super::TcpStream;
+ use std::os::wasi::prelude::*;
+
+ impl AsRawFd for TcpStream {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+ }
+}