diff options
Diffstat (limited to 'src/net/udp/socket.rs')
-rw-r--r-- | src/net/udp/socket.rs | 432 |
1 files changed, 308 insertions, 124 deletions
diff --git a/src/net/udp/socket.rs b/src/net/udp/socket.rs index 97090a2..d13e92b 100644 --- a/src/net/udp/socket.rs +++ b/src/net/udp/socket.rs @@ -1,16 +1,110 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::udp::split::{split, RecvHalf, SendHalf}; -use crate::net::ToSocketAddrs; +use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; use std::fmt; use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::task::{Context, Poll}; -cfg_udp! { +cfg_net! { /// A UDP socket + /// + /// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket` + /// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`: + /// + /// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`) + /// and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses + /// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`) + /// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address + /// + /// `UdpSocket` can also be used concurrently to `send_to` and `recv_from` in different tasks, + /// all that's required is that you `Arc<UdpSocket>` and clone a reference for each task. + /// + /// # Streams + /// + /// If you need to listen over UDP and produce a [`Stream`](`crate::stream::Stream`), you can look + /// at [`UdpFramed`]. + /// + /// [`UdpFramed`]: https://docs.rs/tokio-util/latest/tokio_util/udp/struct.UdpFramed.html + /// + /// # Example: one to many (bind) + /// + /// Using `bind` we can create a simple echo server that sends and recv's with many different clients: + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; + /// let mut buf = [0; 1024]; + /// loop { + /// let (len, addr) = sock.recv_from(&mut buf).await?; + /// println!("{:?} bytes received from {:?}", len, addr); + /// + /// let len = sock.send_to(&buf[..len], addr).await?; + /// println!("{:?} bytes sent", len); + /// } + /// } + /// ``` + /// + /// # Example: one to one (connect) + /// + /// Or using `connect` we can echo with a single remote address using `send` and `recv`: + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; + /// + /// let remote_addr = "127.0.0.1:59611"; + /// sock.connect(remote_addr).await?; + /// let mut buf = [0; 1024]; + /// loop { + /// let len = sock.recv(&mut buf).await?; + /// println!("{:?} bytes received from {:?}", len, remote_addr); + /// + /// let len = sock.send(&buf[..len]).await?; + /// println!("{:?} bytes sent", len); + /// } + /// } + /// ``` + /// + /// # Example: Sending/Receiving concurrently + /// + /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright to `Arc<UdpSocket>` + /// and share the references to multiple tasks, in order to send/receive concurrently. Here is + /// a similar "echo" example but that supports concurrent sending/receiving: + /// + /// ```no_run + /// use tokio::{net::UdpSocket, sync::mpsc}; + /// use std::{io, net::SocketAddr, sync::Arc}; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?; + /// let r = Arc::new(sock); + /// let s = r.clone(); + /// let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000); + /// + /// tokio::spawn(async move { + /// while let Some((bytes, addr)) = rx.recv().await { + /// let len = s.send_to(&bytes, &addr).await.unwrap(); + /// println!("{:?} bytes sent", len); + /// } + /// }); + /// + /// let mut buf = [0; 1024]; + /// loop { + /// let (len, addr) = r.recv_from(&mut buf).await?; + /// println!("{:?} bytes received from {:?}", len, addr); + /// tx.send((buf[..len].to_vec(), addr)).await.unwrap(); + /// } + /// } + /// ``` + /// pub struct UdpSocket { io: PollEvented<mio::net::UdpSocket>, } @@ -19,8 +113,23 @@ cfg_udp! { impl UdpSocket { /// This function will create a new UDP socket and attempt to bind it to /// the `addr` provided. + /// + /// # Example + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; + /// // use `sock` + /// # let _ = sock; + /// Ok(()) + /// } + /// ``` pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> { - let addrs = addr.to_socket_addrs().await?; + let addrs = to_socket_addrs(addr).await?; let mut last_err = None; for addr in addrs { @@ -39,7 +148,7 @@ impl UdpSocket { } fn bind_addr(addr: SocketAddr) -> io::Result<UdpSocket> { - let sys = mio::net::UdpSocket::bind(&addr)?; + let sys = mio::net::UdpSocket::bind(addr)?; UdpSocket::new(sys) } @@ -64,21 +173,45 @@ impl UdpSocket { /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + /// + /// # Example + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// # use std::{io, net::SocketAddr}; + /// + /// # #[tokio::main] + /// # async fn main() -> io::Result<()> { + /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap(); + /// let std_sock = std::net::UdpSocket::bind(addr)?; + /// let sock = UdpSocket::from_std(std_sock)?; + /// // use `sock` + /// # Ok(()) + /// # } + /// ``` pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> { - let io = mio::net::UdpSocket::from_socket(socket)?; - let io = PollEvented::new(io)?; - Ok(UdpSocket { io }) - } - - /// Splits the `UdpSocket` into a receive half and a send half. The two parts - /// can be used to receive and send datagrams concurrently, even from two - /// different tasks. - pub fn split(self) -> (RecvHalf, SendHalf) { - split(self) + let io = mio::net::UdpSocket::from_std(socket); + UdpSocket::new(io) } /// Returns the local address that this socket is bound to. + /// + /// # Example + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// # use std::{io, net::SocketAddr}; + /// + /// # #[tokio::main] + /// # async fn main() -> io::Result<()> { + /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap(); + /// let sock = UdpSocket::bind(addr).await?; + /// // the address the socket is bound to + /// let local_addr = sock.local_addr()?; + /// # Ok(()) + /// # } + /// ``` pub fn local_addr(&self) -> io::Result<SocketAddr> { self.io.get_ref().local_addr() } @@ -86,8 +219,29 @@ impl UdpSocket { /// Connects the UDP socket setting the default destination for send() and /// limiting packets that are read via recv from the address specified in /// `addr`. + /// + /// # Example + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// # use std::{io, net::SocketAddr}; + /// + /// # #[tokio::main] + /// # async fn main() -> io::Result<()> { + /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?; + /// + /// let remote_addr = "127.0.0.1:59600".parse::<SocketAddr>().unwrap(); + /// sock.connect(remote_addr).await?; + /// let mut buf = [0u8; 32]; + /// // recv from remote_addr + /// let len = sock.recv(&mut buf).await?; + /// // send to remote_addr + /// let _len = sock.send(&buf[..len]).await?; + /// # Ok(()) + /// # } + /// ``` pub async fn connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()> { - let addrs = addr.to_socket_addrs().await?; + let addrs = to_socket_addrs(addr).await?; let mut last_err = None; for addr in addrs { @@ -112,31 +266,24 @@ impl UdpSocket { /// will resolve to an error if the socket is not connected. /// /// [`connect`]: method@Self::connect - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_send(cx, buf)).await - } - - // Poll IO functions that takes `&self` are provided for the split API. - // - // They are not public because (taken from the doc of `PollEvented`): - // - // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the - // caller must ensure that there are at most two tasks that use a - // `PollEvented` instance concurrently. One for reading and one for writing. - // While violating this requirement is "safe" from a Rust memory model point - // of view, it will result in unexpected behavior in the form of lost - // notifications and tasks hanging. - #[doc(hidden)] - pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } + pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { + self.io + .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf)) + .await + } + + /// Try to send data on the socket to the remote address to which it is + /// connected. + /// + /// # Returns + /// + /// If successfull, the number of bytes sent is returned. Users + /// should ensure that when the remote cannot receive, the + /// [`ErrorKind::WouldBlock`] is properly handled. + /// + /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock + pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> { + self.io.get_ref().send(buf) } /// Returns a future that receives a single datagram message on the socket from @@ -151,21 +298,10 @@ impl UdpSocket { /// will fail if the socket is not connected. /// /// [`connect`]: method@Self::connect - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_recv(cx, buf)).await - } - - #[doc(hidden)] - pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } + pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.io + .async_io(mio::Interest::READABLE, |sock| sock.recv(buf)) + .await } /// Returns a future that sends data on the socket to the given address. @@ -173,11 +309,27 @@ impl UdpSocket { /// /// The future will resolve to an error if the IP version of the socket does /// not match that of `target`. - pub async fn send_to<A: ToSocketAddrs>(&mut self, buf: &[u8], target: A) -> io::Result<usize> { - let mut addrs = target.to_socket_addrs().await?; + /// + /// # Example + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// # use std::{io, net::SocketAddr}; + /// + /// # #[tokio::main] + /// # async fn main() -> io::Result<()> { + /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?; + /// let buf = b"hello world"; + /// let remote_addr = "127.0.0.1:58000".parse::<SocketAddr>().unwrap(); + /// let _len = sock.send_to(&buf[..], remote_addr).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> { + let mut addrs = to_socket_addrs(target).await?; match addrs.next() { - Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await, + Some(target) => self.send_to_addr(buf, target).await, None => Err(io::Error::new( io::ErrorKind::InvalidInput, "no addresses to send data to", @@ -185,23 +337,42 @@ impl UdpSocket { } } - // TODO: Public or not? - #[doc(hidden)] - pub fn poll_send_to( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: &SocketAddr, - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } + /// Try to send data on the socket to the given address, but if the send is blocked + /// this will return right away. + /// + /// # Returns + /// + /// If successfull, returns the number of bytes sent + /// + /// Users should ensure that when the remote cannot receive, the + /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur + /// if the IP version of the socket does not match that of `target`. + /// + /// # Example + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// # use std::{io, net::SocketAddr}; + /// + /// # #[tokio::main] + /// # async fn main() -> io::Result<()> { + /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?; + /// let buf = b"hello world"; + /// let remote_addr = "127.0.0.1:58000".parse::<SocketAddr>().unwrap(); + /// let _len = sock.try_send_to(&buf[..], remote_addr)?; + /// # Ok(()) + /// # } + /// ``` + /// + /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock + pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { + self.io.get_ref().send_to(buf, target) + } + + async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { + self.io + .async_io(mio::Interest::WRITABLE, |sock| sock.send_to(buf, target)) + .await } /// Returns a future that receives a single datagram on the socket. On success, @@ -210,25 +381,26 @@ impl UdpSocket { /// The function must be called with valid byte array `buf` of sufficient size /// to hold the message bytes. If a message is too long to fit in the supplied /// buffer, excess bytes may be discarded. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from(cx, buf)).await - } - - #[doc(hidden)] - pub fn poll_recv_from( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<Result<(usize, SocketAddr), io::Error>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv_from(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } + /// + /// # Example + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// # use std::{io, net::SocketAddr}; + /// + /// # #[tokio::main] + /// # async fn main() -> io::Result<()> { + /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?; + /// let mut buf = [0u8; 32]; + /// let (len, addr) = sock.recv_from(&mut buf).await?; + /// println!("received {:?} bytes from {:?}", len, addr); + /// # Ok(()) + /// # } + /// ``` + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.io + .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf)) + .await } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -315,6 +487,20 @@ impl UdpSocket { /// For more information about this option, see [`set_ttl`]. /// /// [`set_ttl`]: method@Self::set_ttl + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// # use std::io; + /// + /// # async fn dox() -> io::Result<()> { + /// let sock = UdpSocket::bind("127.0.0.1:8080").await?; + /// + /// println!("{:?}", sock.ttl()?); + /// # Ok(()) + /// # } + /// ``` pub fn ttl(&self) -> io::Result<u32> { self.io.get_ref().ttl() } @@ -323,6 +509,20 @@ impl UdpSocket { /// /// This value sets the time-to-live field that is used in every packet sent /// from this socket. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// # use std::io; + /// + /// # async fn dox() -> io::Result<()> { + /// let sock = UdpSocket::bind("127.0.0.1:8080").await?; + /// sock.set_ttl(60)?; + /// + /// # Ok(()) + /// # } + /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { self.io.get_ref().set_ttl(ttl) } @@ -366,28 +566,14 @@ impl UdpSocket { } } -impl TryFrom<UdpSocket> for mio::net::UdpSocket { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner - fn try_from(value: UdpSocket) -> Result<Self, Self::Error> { - value.io.into_inner() - } -} - -impl TryFrom<net::UdpSocket> for UdpSocket { +impl TryFrom<std::net::UdpSocket> for UdpSocket { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`UdpSocket::from_std(stream)`](UdpSocket::from_std). - fn try_from(stream: net::UdpSocket) -> Result<Self, Self::Error> { + fn try_from(stream: std::net::UdpSocket) -> Result<Self, Self::Error> { Self::from_std(stream) } } @@ -412,14 +598,12 @@ mod sys { #[cfg(windows)] mod sys { - // TODO: let's land these upstream with mio and then we can add them here. - // - // use std::os::windows::prelude::*; - // use super::UdpSocket; - // - // impl AsRawHandle for UdpSocket { - // fn as_raw_handle(&self) -> RawHandle { - // self.io.get_ref().as_raw_handle() - // } - // } + use super::UdpSocket; + use std::os::windows::prelude::*; + + impl AsRawSocket for UdpSocket { + fn as_raw_socket(&self) -> RawSocket { + self.io.get_ref().as_raw_socket() + } + } } |