aboutsummaryrefslogtreecommitdiff
path: root/src/net/udp/socket.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/udp/socket.rs')
-rw-r--r--src/net/udp/socket.rs432
1 files changed, 308 insertions, 124 deletions
diff --git a/src/net/udp/socket.rs b/src/net/udp/socket.rs
index 97090a2..d13e92b 100644
--- a/src/net/udp/socket.rs
+++ b/src/net/udp/socket.rs
@@ -1,16 +1,110 @@
-use crate::future::poll_fn;
use crate::io::PollEvented;
-use crate::net::udp::split::{split, RecvHalf, SendHalf};
-use crate::net::ToSocketAddrs;
+use crate::net::{to_socket_addrs, ToSocketAddrs};
use std::convert::TryFrom;
use std::fmt;
use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
-use std::task::{Context, Poll};
-cfg_udp! {
+cfg_net! {
/// A UDP socket
+ ///
+ /// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket`
+ /// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`:
+ ///
+ /// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`)
+ /// and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses
+ /// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`)
+ /// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address
+ ///
+ /// `UdpSocket` can also be used concurrently to `send_to` and `recv_from` in different tasks,
+ /// all that's required is that you `Arc<UdpSocket>` and clone a reference for each task.
+ ///
+ /// # Streams
+ ///
+ /// If you need to listen over UDP and produce a [`Stream`](`crate::stream::Stream`), you can look
+ /// at [`UdpFramed`].
+ ///
+ /// [`UdpFramed`]: https://docs.rs/tokio-util/latest/tokio_util/udp/struct.UdpFramed.html
+ ///
+ /// # Example: one to many (bind)
+ ///
+ /// Using `bind` we can create a simple echo server that sends and recv's with many different clients:
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080").await?;
+ /// let mut buf = [0; 1024];
+ /// loop {
+ /// let (len, addr) = sock.recv_from(&mut buf).await?;
+ /// println!("{:?} bytes received from {:?}", len, addr);
+ ///
+ /// let len = sock.send_to(&buf[..len], addr).await?;
+ /// println!("{:?} bytes sent", len);
+ /// }
+ /// }
+ /// ```
+ ///
+ /// # Example: one to one (connect)
+ ///
+ /// Or using `connect` we can echo with a single remote address using `send` and `recv`:
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080").await?;
+ ///
+ /// let remote_addr = "127.0.0.1:59611";
+ /// sock.connect(remote_addr).await?;
+ /// let mut buf = [0; 1024];
+ /// loop {
+ /// let len = sock.recv(&mut buf).await?;
+ /// println!("{:?} bytes received from {:?}", len, remote_addr);
+ ///
+ /// let len = sock.send(&buf[..len]).await?;
+ /// println!("{:?} bytes sent", len);
+ /// }
+ /// }
+ /// ```
+ ///
+ /// # Example: Sending/Receiving concurrently
+ ///
+ /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright to `Arc<UdpSocket>`
+ /// and share the references to multiple tasks, in order to send/receive concurrently. Here is
+ /// a similar "echo" example but that supports concurrent sending/receiving:
+ ///
+ /// ```no_run
+ /// use tokio::{net::UdpSocket, sync::mpsc};
+ /// use std::{io, net::SocketAddr, sync::Arc};
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
+ /// let r = Arc::new(sock);
+ /// let s = r.clone();
+ /// let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000);
+ ///
+ /// tokio::spawn(async move {
+ /// while let Some((bytes, addr)) = rx.recv().await {
+ /// let len = s.send_to(&bytes, &addr).await.unwrap();
+ /// println!("{:?} bytes sent", len);
+ /// }
+ /// });
+ ///
+ /// let mut buf = [0; 1024];
+ /// loop {
+ /// let (len, addr) = r.recv_from(&mut buf).await?;
+ /// println!("{:?} bytes received from {:?}", len, addr);
+ /// tx.send((buf[..len].to_vec(), addr)).await.unwrap();
+ /// }
+ /// }
+ /// ```
+ ///
pub struct UdpSocket {
io: PollEvented<mio::net::UdpSocket>,
}
@@ -19,8 +113,23 @@ cfg_udp! {
impl UdpSocket {
/// This function will create a new UDP socket and attempt to bind it to
/// the `addr` provided.
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080").await?;
+ /// // use `sock`
+ /// # let _ = sock;
+ /// Ok(())
+ /// }
+ /// ```
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> {
- let addrs = addr.to_socket_addrs().await?;
+ let addrs = to_socket_addrs(addr).await?;
let mut last_err = None;
for addr in addrs {
@@ -39,7 +148,7 @@ impl UdpSocket {
}
fn bind_addr(addr: SocketAddr) -> io::Result<UdpSocket> {
- let sys = mio::net::UdpSocket::bind(&addr)?;
+ let sys = mio::net::UdpSocket::bind(addr)?;
UdpSocket::new(sys)
}
@@ -64,21 +173,45 @@ impl UdpSocket {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::{io, net::SocketAddr};
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> io::Result<()> {
+ /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
+ /// let std_sock = std::net::UdpSocket::bind(addr)?;
+ /// let sock = UdpSocket::from_std(std_sock)?;
+ /// // use `sock`
+ /// # Ok(())
+ /// # }
+ /// ```
pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
- let io = mio::net::UdpSocket::from_socket(socket)?;
- let io = PollEvented::new(io)?;
- Ok(UdpSocket { io })
- }
-
- /// Splits the `UdpSocket` into a receive half and a send half. The two parts
- /// can be used to receive and send datagrams concurrently, even from two
- /// different tasks.
- pub fn split(self) -> (RecvHalf, SendHalf) {
- split(self)
+ let io = mio::net::UdpSocket::from_std(socket);
+ UdpSocket::new(io)
}
/// Returns the local address that this socket is bound to.
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::{io, net::SocketAddr};
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> io::Result<()> {
+ /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
+ /// let sock = UdpSocket::bind(addr).await?;
+ /// // the address the socket is bound to
+ /// let local_addr = sock.local_addr()?;
+ /// # Ok(())
+ /// # }
+ /// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
}
@@ -86,8 +219,29 @@ impl UdpSocket {
/// Connects the UDP socket setting the default destination for send() and
/// limiting packets that are read via recv from the address specified in
/// `addr`.
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::{io, net::SocketAddr};
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
+ ///
+ /// let remote_addr = "127.0.0.1:59600".parse::<SocketAddr>().unwrap();
+ /// sock.connect(remote_addr).await?;
+ /// let mut buf = [0u8; 32];
+ /// // recv from remote_addr
+ /// let len = sock.recv(&mut buf).await?;
+ /// // send to remote_addr
+ /// let _len = sock.send(&buf[..len]).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
pub async fn connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()> {
- let addrs = addr.to_socket_addrs().await?;
+ let addrs = to_socket_addrs(addr).await?;
let mut last_err = None;
for addr in addrs {
@@ -112,31 +266,24 @@ impl UdpSocket {
/// will resolve to an error if the socket is not connected.
///
/// [`connect`]: method@Self::connect
- pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_send(cx, buf)).await
- }
-
- // Poll IO functions that takes `&self` are provided for the split API.
- //
- // They are not public because (taken from the doc of `PollEvented`):
- //
- // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
- // caller must ensure that there are at most two tasks that use a
- // `PollEvented` instance concurrently. One for reading and one for writing.
- // While violating this requirement is "safe" from a Rust memory model point
- // of view, it will result in unexpected behavior in the form of lost
- // notifications and tasks hanging.
- #[doc(hidden)]
- pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().send(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
+ pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf))
+ .await
+ }
+
+ /// Try to send data on the socket to the remote address to which it is
+ /// connected.
+ ///
+ /// # Returns
+ ///
+ /// If successfull, the number of bytes sent is returned. Users
+ /// should ensure that when the remote cannot receive, the
+ /// [`ErrorKind::WouldBlock`] is properly handled.
+ ///
+ /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock
+ pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io.get_ref().send(buf)
}
/// Returns a future that receives a single datagram message on the socket from
@@ -151,21 +298,10 @@ impl UdpSocket {
/// will fail if the socket is not connected.
///
/// [`connect`]: method@Self::connect
- pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_recv(cx, buf)).await
- }
-
- #[doc(hidden)]
- pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().recv(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
+ pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Interest::READABLE, |sock| sock.recv(buf))
+ .await
}
/// Returns a future that sends data on the socket to the given address.
@@ -173,11 +309,27 @@ impl UdpSocket {
///
/// The future will resolve to an error if the IP version of the socket does
/// not match that of `target`.
- pub async fn send_to<A: ToSocketAddrs>(&mut self, buf: &[u8], target: A) -> io::Result<usize> {
- let mut addrs = target.to_socket_addrs().await?;
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::{io, net::SocketAddr};
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
+ /// let buf = b"hello world";
+ /// let remote_addr = "127.0.0.1:58000".parse::<SocketAddr>().unwrap();
+ /// let _len = sock.send_to(&buf[..], remote_addr).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> {
+ let mut addrs = to_socket_addrs(target).await?;
match addrs.next() {
- Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await,
+ Some(target) => self.send_to_addr(buf, target).await,
None => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"no addresses to send data to",
@@ -185,23 +337,42 @@ impl UdpSocket {
}
}
- // TODO: Public or not?
- #[doc(hidden)]
- pub fn poll_send_to(
- &self,
- cx: &mut Context<'_>,
- buf: &[u8],
- target: &SocketAddr,
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().send_to(buf, target) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
+ /// Try to send data on the socket to the given address, but if the send is blocked
+ /// this will return right away.
+ ///
+ /// # Returns
+ ///
+ /// If successfull, returns the number of bytes sent
+ ///
+ /// Users should ensure that when the remote cannot receive, the
+ /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur
+ /// if the IP version of the socket does not match that of `target`.
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::{io, net::SocketAddr};
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
+ /// let buf = b"hello world";
+ /// let remote_addr = "127.0.0.1:58000".parse::<SocketAddr>().unwrap();
+ /// let _len = sock.try_send_to(&buf[..], remote_addr)?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock
+ pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
+ self.io.get_ref().send_to(buf, target)
+ }
+
+ async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Interest::WRITABLE, |sock| sock.send_to(buf, target))
+ .await
}
/// Returns a future that receives a single datagram on the socket. On success,
@@ -210,25 +381,26 @@ impl UdpSocket {
/// The function must be called with valid byte array `buf` of sufficient size
/// to hold the message bytes. If a message is too long to fit in the supplied
/// buffer, excess bytes may be discarded.
- pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- poll_fn(|cx| self.poll_recv_from(cx, buf)).await
- }
-
- #[doc(hidden)]
- pub fn poll_recv_from(
- &self,
- cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<Result<(usize, SocketAddr), io::Error>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().recv_from(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::{io, net::SocketAddr};
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
+ /// let mut buf = [0u8; 32];
+ /// let (len, addr) = sock.recv_from(&mut buf).await?;
+ /// println!("received {:?} bytes from {:?}", len, addr);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ self.io
+ .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf))
+ .await
}
/// Gets the value of the `SO_BROADCAST` option for this socket.
@@ -315,6 +487,20 @@ impl UdpSocket {
/// For more information about this option, see [`set_ttl`].
///
/// [`set_ttl`]: method@Self::set_ttl
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::io;
+ ///
+ /// # async fn dox() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("127.0.0.1:8080").await?;
+ ///
+ /// println!("{:?}", sock.ttl()?);
+ /// # Ok(())
+ /// # }
+ /// ```
pub fn ttl(&self) -> io::Result<u32> {
self.io.get_ref().ttl()
}
@@ -323,6 +509,20 @@ impl UdpSocket {
///
/// This value sets the time-to-live field that is used in every packet sent
/// from this socket.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::io;
+ ///
+ /// # async fn dox() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// sock.set_ttl(60)?;
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.io.get_ref().set_ttl(ttl)
}
@@ -366,28 +566,14 @@ impl UdpSocket {
}
}
-impl TryFrom<UdpSocket> for mio::net::UdpSocket {
- type Error = io::Error;
-
- /// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
- fn try_from(value: UdpSocket) -> Result<Self, Self::Error> {
- value.io.into_inner()
- }
-}
-
-impl TryFrom<net::UdpSocket> for UdpSocket {
+impl TryFrom<std::net::UdpSocket> for UdpSocket {
type Error = io::Error;
/// Consumes stream, returning the tokio I/O object.
///
/// This is equivalent to
/// [`UdpSocket::from_std(stream)`](UdpSocket::from_std).
- fn try_from(stream: net::UdpSocket) -> Result<Self, Self::Error> {
+ fn try_from(stream: std::net::UdpSocket) -> Result<Self, Self::Error> {
Self::from_std(stream)
}
}
@@ -412,14 +598,12 @@ mod sys {
#[cfg(windows)]
mod sys {
- // TODO: let's land these upstream with mio and then we can add them here.
- //
- // use std::os::windows::prelude::*;
- // use super::UdpSocket;
- //
- // impl AsRawHandle for UdpSocket {
- // fn as_raw_handle(&self) -> RawHandle {
- // self.io.get_ref().as_raw_handle()
- // }
- // }
+ use super::UdpSocket;
+ use std::os::windows::prelude::*;
+
+ impl AsRawSocket for UdpSocket {
+ fn as_raw_socket(&self) -> RawSocket {
+ self.io.get_ref().as_raw_socket()
+ }
+ }
}