aboutsummaryrefslogtreecommitdiff
path: root/src/net/udp.rs
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2021-02-09 18:18:56 -0800
committerStephen Hines <srhines@google.com>2021-03-03 21:34:52 +0000
commite3d8d80d2d8744ccdcd175323e0864c8f30fcedc (patch)
tree16d053e70d21e456d52f4a7762ee41441342b7a2 /src/net/udp.rs
parent925d648e545e70d6a4faae3d7efe5e0de885f922 (diff)
downloadtokio-e3d8d80d2d8744ccdcd175323e0864c8f30fcedc.tar.gz
Upgrade rust/crates/tokio to 1.2.0
Test: make Change-Id: Ib0f6a5201b51e9d122b6e867388a3856e16f803a
Diffstat (limited to 'src/net/udp.rs')
-rw-r--r--src/net/udp.rs154
1 files changed, 147 insertions, 7 deletions
diff --git a/src/net/udp.rs b/src/net/udp.rs
index 23abe98..86b4fe9 100644
--- a/src/net/udp.rs
+++ b/src/net/udp.rs
@@ -7,6 +7,10 @@ use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::task::{Context, Poll};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
cfg_net! {
/// A UDP socket
///
@@ -18,8 +22,13 @@ cfg_net! {
/// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`)
/// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address
///
- /// `UdpSocket` can also be used concurrently to `send_to` and `recv_from` in different tasks,
- /// all that's required is that you `Arc<UdpSocket>` and clone a reference for each task.
+ /// This type does not provide a `split` method, because this functionality
+ /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
+ /// not need a `Mutex` to share the `UdpSocket` — an `Arc<UdpSocket>` is
+ /// enough. This is because all of the methods take `&self` instead of `&mut
+ /// self`.
+ ///
+ /// [`Arc`]: std::sync::Arc
///
/// # Streams
///
@@ -74,11 +83,12 @@ cfg_net! {
/// }
/// ```
///
- /// # Example: Sending/Receiving concurrently
+ /// # Example: Splitting with `Arc`
///
- /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright to `Arc<UdpSocket>`
- /// and share the references to multiple tasks, in order to send/receive concurrently. Here is
- /// a similar "echo" example but that supports concurrent sending/receiving:
+ /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright
+ /// to use an `Arc<UdpSocket>` and share the references to multiple tasks.
+ /// Here is a similar "echo" example that supports concurrent
+ /// sending/receiving:
///
/// ```no_run
/// use tokio::{net::UdpSocket, sync::mpsc};
@@ -683,6 +693,137 @@ impl UdpSocket {
.try_io(Interest::READABLE, || self.io.recv(buf))
}
+ cfg_io_util! {
+ /// Try to receive data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// The function must be called with valid byte array buf of sufficient size
+ /// to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
+ ///
+ /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
+ /// returned. This function is usually paired with `readable()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(1024);
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv_buf(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.io.registration().try_io(Interest::READABLE, || {
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the
+ // buffer.
+ let n = (&*self.io).recv(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok(n)
+ })
+ }
+
+ /// Try to receive a single datagram message on the socket. On success,
+ /// returns the number of bytes read and the origin.
+ ///
+ /// The function must be called with valid byte array buf of sufficient size
+ /// to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
+ ///
+ /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
+ /// returned. This function is usually paired with `readable()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(1024);
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv_buf_from(&mut buf) {
+ /// Ok((n, _addr)) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
+ self.io.registration().try_io(Interest::READABLE, || {
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the
+ // buffer.
+ let (n, addr) = (&*self.io).recv_from(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok((n, addr))
+ })
+ }
+ }
+
/// Sends data on the socket to the given address. On success, returns the
/// number of bytes written.
///
@@ -904,7 +1045,6 @@ impl UdpSocket {
/// async fn main() -> io::Result<()> {
/// // Connect to a peer
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
- /// socket.connect("127.0.0.1:8081").await?;
///
/// loop {
/// // Wait for the socket to be readable