diff options
author | Haibo Huang <hhb@google.com> | 2021-02-09 18:18:56 -0800 |
---|---|---|
committer | Stephen Hines <srhines@google.com> | 2021-03-03 21:34:52 +0000 |
commit | e3d8d80d2d8744ccdcd175323e0864c8f30fcedc (patch) | |
tree | 16d053e70d21e456d52f4a7762ee41441342b7a2 /src/net/udp.rs | |
parent | 925d648e545e70d6a4faae3d7efe5e0de885f922 (diff) | |
download | tokio-e3d8d80d2d8744ccdcd175323e0864c8f30fcedc.tar.gz |
Upgrade rust/crates/tokio to 1.2.0
Test: make
Change-Id: Ib0f6a5201b51e9d122b6e867388a3856e16f803a
Diffstat (limited to 'src/net/udp.rs')
-rw-r--r-- | src/net/udp.rs | 154 |
1 files changed, 147 insertions, 7 deletions
diff --git a/src/net/udp.rs b/src/net/udp.rs index 23abe98..86b4fe9 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -7,6 +7,10 @@ use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::task::{Context, Poll}; +cfg_io_util! { + use bytes::BufMut; +} + cfg_net! { /// A UDP socket /// @@ -18,8 +22,13 @@ cfg_net! { /// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`) /// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address /// - /// `UdpSocket` can also be used concurrently to `send_to` and `recv_from` in different tasks, - /// all that's required is that you `Arc<UdpSocket>` and clone a reference for each task. + /// This type does not provide a `split` method, because this functionality + /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do + /// not need a `Mutex` to share the `UdpSocket` — an `Arc<UdpSocket>` is + /// enough. This is because all of the methods take `&self` instead of `&mut + /// self`. + /// + /// [`Arc`]: std::sync::Arc /// /// # Streams /// @@ -74,11 +83,12 @@ cfg_net! { /// } /// ``` /// - /// # Example: Sending/Receiving concurrently + /// # Example: Splitting with `Arc` /// - /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright to `Arc<UdpSocket>` - /// and share the references to multiple tasks, in order to send/receive concurrently. Here is - /// a similar "echo" example but that supports concurrent sending/receiving: + /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright + /// to use an `Arc<UdpSocket>` and share the references to multiple tasks. + /// Here is a similar "echo" example that supports concurrent + /// sending/receiving: /// /// ```no_run /// use tokio::{net::UdpSocket, sync::mpsc}; @@ -683,6 +693,137 @@ impl UdpSocket { .try_io(Interest::READABLE, || self.io.recv(buf)) } + cfg_io_util! { + /// Try to receive data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// The function must be called with valid byte array buf of sufficient size + /// to hold the message bytes. If a message is too long to fit in the + /// supplied buffer, excess bytes may be discarded. + /// + /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is + /// returned. This function is usually paired with `readable()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// socket.connect("127.0.0.1:8081").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// let mut buf = Vec::with_capacity(1024); + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv_buf(&mut buf) { + /// Ok(n) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.io.registration().try_io(Interest::READABLE, || { + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; + + // Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the + // buffer. + let n = (&*self.io).recv(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok(n) + }) + } + + /// Try to receive a single datagram message on the socket. On success, + /// returns the number of bytes read and the origin. + /// + /// The function must be called with valid byte array buf of sufficient size + /// to hold the message bytes. If a message is too long to fit in the + /// supplied buffer, excess bytes may be discarded. + /// + /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is + /// returned. This function is usually paired with `readable()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// let mut buf = Vec::with_capacity(1024); + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv_buf_from(&mut buf) { + /// Ok((n, _addr)) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> { + self.io.registration().try_io(Interest::READABLE, || { + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; + + // Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the + // buffer. + let (n, addr) = (&*self.io).recv_from(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok((n, addr)) + }) + } + } + /// Sends data on the socket to the given address. On success, returns the /// number of bytes written. /// @@ -904,7 +1045,6 @@ impl UdpSocket { /// async fn main() -> io::Result<()> { /// // Connect to a peer /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// socket.connect("127.0.0.1:8081").await?; /// /// loop { /// // Wait for the socket to be readable |