aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp16
-rw-r--r--CHANGELOG.md14
-rw-r--r--Cargo.toml13
-rw-r--r--Cargo.toml.orig10
-rw-r--r--METADATA6
-rw-r--r--README.md2
-rw-r--r--src/io/async_fd.rs337
-rw-r--r--src/io/driver/mod.rs53
-rw-r--r--src/io/driver/scheduled_io.rs33
-rw-r--r--src/io/mod.rs12
-rw-r--r--src/io/read_buf.rs9
-rw-r--r--src/io/util/async_read_ext.rs2
-rw-r--r--src/lib.rs5
-rw-r--r--src/loom/std/mod.rs2
-rw-r--r--src/loom/std/parking_lot.rs5
-rw-r--r--src/net/tcp/listener.rs6
-rw-r--r--src/net/tcp/stream.rs6
-rw-r--r--src/net/udp/socket.rs293
-rw-r--r--src/net/unix/datagram/socket.rs9
-rw-r--r--src/net/unix/listener.rs11
-rw-r--r--src/net/unix/stream.rs9
-rw-r--r--src/sync/oneshot.rs148
-rw-r--r--src/sync/rwlock.rs5
-rw-r--r--src/sync/tests/loom_rwlock.rs25
-rw-r--r--src/sync/tests/mod.rs1
-rw-r--r--tests/io_async_fd.rs604
-rw-r--r--tests/udp.rs131
28 files changed, 1643 insertions, 126 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 4ca07f3..9faa151 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "d14cbf91162d1ae2976a67a43e1f06cda1c21e29"
+ "sha1": "9097ae548f9a1bcd261385ceba29b800b0ee2a21"
}
}
diff --git a/Android.bp b/Android.bp
index fcb07e5..0faaf79 100644
--- a/Android.bp
+++ b/Android.bp
@@ -37,26 +37,24 @@ rust_library {
"libpin_project_lite",
"libslab",
],
- proc_macros: [
- "libtokio_macros",
- ],
+ proc_macros: ["libtokio_macros"],
}
// dependent_library ["feature_list"]
// bytes-0.6.0 "default,std"
// cfg-if-0.1.10
// fnv-1.0.7 "default,std"
-// futures-core-0.3.6 "alloc,default,std"
+// futures-core-0.3.7 "alloc,default,std"
// lazy_static-1.4.0
-// libc-0.2.79 "default,std"
+// libc-0.2.80 "default,extra_traits,std"
// log-0.4.11
-// memchr-2.3.3 "default,std"
-// mio-0.7.4 "default,os-poll,tcp,udp,uds"
+// memchr-2.3.4 "default,std"
+// mio-0.7.4 "default,os-poll,os-util,tcp,udp,uds"
// num_cpus-1.13.0
// pin-project-lite-0.1.11
// proc-macro2-1.0.24 "default,proc-macro"
// quote-1.0.7 "default,proc-macro"
// slab-0.4.2
-// syn-1.0.46 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit,visit-mut"
-// tokio-macros-0.3.0
+// syn-1.0.48 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit,visit-mut"
+// tokio-macros-0.3.1
// unicode-xid-0.2.1 "default"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 001bbef..fa17be4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,17 @@
+# 0.3.2 (October 27, 2020)
+
+Adds `AsyncFd` as a replacement for v0.2's `PollEvented`.
+
+### Fixed
+- io: fix a potential deadlock when shutting down the I/O driver (#2903).
+- sync: `RwLockWriteGuard::downgrade()` bug (#2957).
+
+### Added
+- io: `AsyncFd` for receiving readiness events on raw FDs (#2903).
+- net: `poll_*` function on `UdpSocket` (#2981).
+- net: `UdpSocket::take_error()` (#3051).
+- sync: `oneshot::Sender::poll_closed()` (#3032).
+
# 0.3.1 (October 21, 2020)
This release fixes an use-after-free in the IO driver. Additionally, the `read_buf`
diff --git a/Cargo.toml b/Cargo.toml
index d4216f6..da14246 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
[package]
edition = "2018"
name = "tokio"
-version = "0.3.1"
+version = "0.3.2"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio/0.3.1/tokio/"
+documentation = "https://docs.rs/tokio/0.3.2/tokio/"
readme = "README.md"
keywords = ["io", "async", "non-blocking", "futures"]
categories = ["asynchronous", "network-programming"]
@@ -97,11 +97,11 @@ full = ["fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "r
io-std = []
io-util = ["memchr", "bytes"]
macros = ["tokio-macros"]
-net = ["lazy_static", "libc", "mio/os-poll", "mio/tcp", "mio/udp", "mio/uds"]
+net = ["lazy_static", "libc", "mio/os-poll", "mio/os-util", "mio/tcp", "mio/udp", "mio/uds"]
process = ["bytes", "lazy_static", "libc", "mio/os-poll", "mio/os-util", "mio/uds", "signal-hook-registry", "winapi/threadpoollegacyapiset"]
rt = ["slab"]
rt-multi-thread = ["num_cpus", "rt"]
-signal = ["lazy_static", "libc", "mio/os-poll", "mio/uds", "signal-hook-registry", "winapi/consoleapi"]
+signal = ["lazy_static", "libc", "mio/os-poll", "mio/uds", "mio/os-util", "signal-hook-registry", "winapi/consoleapi"]
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
@@ -116,6 +116,11 @@ optional = true
[target."cfg(unix)".dependencies.signal-hook-registry]
version = "1.1.1"
optional = true
+[target."cfg(unix)".dev-dependencies.libc]
+version = "0.2.42"
+
+[target."cfg(unix)".dev-dependencies.nix]
+version = "0.18.0"
[target."cfg(windows)".dependencies.winapi]
version = "0.3.8"
optional = true
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 57e49d6..144c2d5 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -8,12 +8,12 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.3.x" git tag.
-version = "0.3.1"
+version = "0.3.2"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
readme = "README.md"
-documentation = "https://docs.rs/tokio/0.3.1/tokio/"
+documentation = "https://docs.rs/tokio/0.3.2/tokio/"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
description = """
@@ -53,6 +53,7 @@ net = [
"lazy_static",
"libc",
"mio/os-poll",
+ "mio/os-util",
"mio/tcp",
"mio/udp",
"mio/uds",
@@ -78,6 +79,7 @@ signal = [
"libc",
"mio/os-poll",
"mio/uds",
+ "mio/os-util",
"signal-hook-registry",
"winapi/consoleapi",
]
@@ -107,6 +109,10 @@ tracing = { version = "0.1.16", default-features = false, features = ["std"], op
libc = { version = "0.2.42", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }
+[target.'cfg(unix)'.dev-dependencies]
+libc = { version = "0.2.42" }
+nix = { version = "0.18.0" }
+
[target.'cfg(windows)'.dependencies.winapi]
version = "0.3.8"
default-features = false
diff --git a/METADATA b/METADATA
index 353ca22..0de43aa 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio/tokio-0.3.1.crate"
+ value: "https://static.crates.io/crates/tokio/tokio-0.3.2.crate"
}
- version: "0.3.1"
+ version: "0.3.2"
license_type: NOTICE
last_upgrade_date {
year: 2020
month: 10
- day: 23
+ day: 28
}
}
diff --git a/README.md b/README.md
index da9078c..b530599 100644
--- a/README.md
+++ b/README.md
@@ -157,7 +157,7 @@ several other libraries, including:
## Supported Rust Versions
-Tokio is built against the latest stable release. The minimum supported version is 1.39.
+Tokio is built against the latest stable release. The minimum supported version is 1.45.
The current Tokio version is not guaranteed to build on Rust versions earlier than the
minimum supported version.
diff --git a/src/io/async_fd.rs b/src/io/async_fd.rs
new file mode 100644
index 0000000..e5ad2ab
--- /dev/null
+++ b/src/io/async_fd.rs
@@ -0,0 +1,337 @@
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::{task::Context, task::Poll};
+
+use std::io;
+
+use mio::unix::SourceFd;
+
+use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo};
+use crate::util::slab;
+
+/// Associates an IO object backed by a Unix file descriptor with the tokio
+/// reactor, allowing for readiness to be polled. The file descriptor must be of
+/// a type that can be used with the OS polling facilities (ie, `poll`, `epoll`,
+/// `kqueue`, etc), such as a network socket or pipe.
+///
+/// Creating an AsyncFd registers the file descriptor with the current tokio
+/// Reactor, allowing you to directly await the file descriptor being readable
+/// or writable. Once registered, the file descriptor remains registered until
+/// the AsyncFd is dropped.
+///
+/// The AsyncFd takes ownership of an arbitrary object to represent the IO
+/// object. It is intended that this object will handle closing the file
+/// descriptor when it is dropped, avoiding resource leaks and ensuring that the
+/// AsyncFd can clean up the registration before closing the file descriptor.
+/// The [`AsyncFd::into_inner`] function can be used to extract the inner object
+/// to retake control from the tokio IO reactor.
+///
+/// The inner object is required to implement [`AsRawFd`]. This file descriptor
+/// must not change while [`AsyncFd`] owns the inner object. Changing the file
+/// descriptor results in unspecified behavior in the IO driver, which may
+/// include breaking notifications for other sockets/etc.
+///
+/// Polling for readiness is done by calling the async functions [`readable`]
+/// and [`writable`]. These functions complete when the associated readiness
+/// condition is observed. Any number of tasks can query the same `AsyncFd` in
+/// parallel, on the same or different conditions.
+///
+/// On some platforms, the readiness detecting mechanism relies on
+/// edge-triggered notifications. This means that the OS will only notify Tokio
+/// when the file descriptor transitions from not-ready to ready. Tokio
+/// internally tracks when it has received a ready notification, and when
+/// readiness checking functions like [`readable`] and [`writable`] are called,
+/// if the readiness flag is set, these async functions will complete
+/// immediately.
+///
+/// This however does mean that it is critical to ensure that this ready flag is
+/// cleared when (and only when) the file descriptor ceases to be ready. The
+/// [`AsyncFdReadyGuard`] returned from readiness checking functions serves this
+/// function; after calling a readiness-checking async function, you must use
+/// this [`AsyncFdReadyGuard`] to signal to tokio whether the file descriptor is no
+/// longer in a ready state.
+///
+/// ## Use with to a poll-based API
+///
+/// In some cases it may be desirable to use `AsyncFd` from APIs similar to
+/// [`TcpStream::poll_read_ready`]. The [`AsyncFd::poll_read_ready`] and
+/// [`AsyncFd::poll_write_ready`] functions are provided for this purpose.
+/// Because these functions don't create a future to hold their state, they have
+/// the limitation that only one task can wait on each direction (read or write)
+/// at a time.
+///
+/// [`readable`]: method@Self::readable
+/// [`writable`]: method@Self::writable
+/// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
+/// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream
+pub struct AsyncFd<T: AsRawFd> {
+ handle: Handle,
+ shared: slab::Ref<ScheduledIo>,
+ inner: Option<T>,
+}
+
+impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_ref().unwrap().as_raw_fd()
+ }
+}
+
+impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("AsyncFd")
+ .field("inner", &self.inner)
+ .finish()
+ }
+}
+
+const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE);
+
+/// Represents an IO-ready event detected on a particular file descriptor, which
+/// has not yet been acknowledged. This is a `must_use` structure to help ensure
+/// that you do not forget to explicitly clear (or not clear) the event.
+#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
+pub struct AsyncFdReadyGuard<'a, T: AsRawFd> {
+ async_fd: &'a AsyncFd<T>,
+ event: Option<ReadyEvent>,
+}
+
+impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ReadyGuard")
+ .field("async_fd", &self.async_fd)
+ .finish()
+ }
+}
+
+impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
+ /// Indicates to tokio that the file descriptor is no longer ready. The
+ /// internal readiness flag will be cleared, and tokio will wait for the
+ /// next edge-triggered readiness notification from the OS.
+ ///
+ /// It is critical that this function not be called unless your code
+ /// _actually observes_ that the file descriptor is _not_ ready. Do not call
+ /// it simply because, for example, a read succeeded; it should be called
+ /// when a read is observed to block.
+ ///
+ /// [`drop`]: method@std::mem::drop
+ pub fn clear_ready(&mut self) {
+ if let Some(event) = self.event.take() {
+ self.async_fd.shared.clear_readiness(event);
+ }
+ }
+
+ /// This function should be invoked when you intentionally want to keep the
+ /// ready flag asserted.
+ ///
+ /// While this function is itself a no-op, it satisfies the `#[must_use]`
+ /// constraint on the [`AsyncFdReadyGuard`] type.
+ pub fn retain_ready(&mut self) {
+ // no-op
+ }
+
+ /// Performs the IO operation `f`; if `f` returns a [`WouldBlock`] error,
+ /// the readiness state associated with this file descriptor is cleared.
+ ///
+ /// This method helps ensure that the readiness state of the underlying file
+ /// descriptor remains in sync with the tokio-side readiness state, by
+ /// clearing the tokio-side state only when a [`WouldBlock`] condition
+ /// occurs. It is the responsibility of the caller to ensure that `f`
+ /// returns [`WouldBlock`] only if the file descriptor that originated this
+ /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
+ /// create this `AsyncFdReadyGuard`.
+ ///
+ /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
+ pub fn with_io<R>(&mut self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
+ let result = f();
+
+ if let Err(e) = result.as_ref() {
+ if e.kind() == io::ErrorKind::WouldBlock {
+ self.clear_ready();
+ }
+ }
+
+ result
+ }
+
+ /// Performs the IO operation `f`; if `f` returns [`Pending`], the readiness
+ /// state associated with this file descriptor is cleared.
+ ///
+ /// This method helps ensure that the readiness state of the underlying file
+ /// descriptor remains in sync with the tokio-side readiness state, by
+ /// clearing the tokio-side state only when a [`Pending`] condition occurs.
+ /// It is the responsibility of the caller to ensure that `f` returns
+ /// [`Pending`] only if the file descriptor that originated this
+ /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
+ /// create this `AsyncFdReadyGuard`.
+ ///
+ /// [`Pending`]: std::task::Poll::Pending
+ pub fn with_poll<R>(&mut self, f: impl FnOnce() -> std::task::Poll<R>) -> std::task::Poll<R> {
+ let result = f();
+
+ if result.is_pending() {
+ self.clear_ready();
+ }
+
+ result
+ }
+}
+
+impl<T: AsRawFd> Drop for AsyncFd<T> {
+ fn drop(&mut self) {
+ if let Some(driver) = self.handle.inner() {
+ if let Some(inner) = self.inner.as_ref() {
+ let fd = inner.as_raw_fd();
+ let _ = driver.deregister_source(&mut SourceFd(&fd));
+ }
+ }
+ }
+}
+
+impl<T: AsRawFd> AsyncFd<T> {
+ /// Creates an AsyncFd backed by (and taking ownership of) an object
+ /// implementing [`AsRawFd`]. The backing file descriptor is cached at the
+ /// time of creation.
+ ///
+ /// This function must be called in the context of a tokio runtime.
+ pub fn new(inner: T) -> io::Result<Self>
+ where
+ T: AsRawFd,
+ {
+ Self::new_with_handle(inner, Handle::current())
+ }
+
+ pub(crate) fn new_with_handle(inner: T, handle: Handle) -> io::Result<Self> {
+ let fd = inner.as_raw_fd();
+
+ let shared = if let Some(inner) = handle.inner() {
+ inner.add_source(&mut SourceFd(&fd), ALL_INTEREST)?
+ } else {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "failed to find event loop",
+ ));
+ };
+
+ Ok(AsyncFd {
+ handle,
+ shared,
+ inner: Some(inner),
+ })
+ }
+
+ /// Returns a shared reference to the backing object of this [`AsyncFd`]
+ #[inline]
+ pub fn get_ref(&self) -> &T {
+ self.inner.as_ref().unwrap()
+ }
+
+ /// Returns a mutable reference to the backing object of this [`AsyncFd`]
+ #[inline]
+ pub fn get_mut(&mut self) -> &mut T {
+ self.inner.as_mut().unwrap()
+ }
+
+ /// Deregisters this file descriptor, and returns ownership of the backing
+ /// object.
+ pub fn into_inner(mut self) -> T {
+ self.inner.take().unwrap()
+ }
+
+ /// Polls for read readiness. This function retains the waker for the last
+ /// context that called [`poll_read_ready`]; it therefore can only be used
+ /// by a single task at a time (however, [`poll_write_ready`] retains a
+ /// second, independent waker).
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`readable`] is not feasible. Where possible, using [`readable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// [`poll_read_ready`]: method@Self::poll_read_ready
+ /// [`poll_write_ready`]: method@Self::poll_write_ready
+ /// [`readable`]: method@Self::readable
+ pub fn poll_read_ready<'a>(
+ &'a self,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
+ let event = ready!(self.shared.poll_readiness(cx, Direction::Read));
+
+ if !self.handle.is_alive() {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "IO driver has terminated",
+ ))
+ .into();
+ }
+
+ Ok(AsyncFdReadyGuard {
+ async_fd: self,
+ event: Some(event),
+ })
+ .into()
+ }
+
+ /// Polls for write readiness. This function retains the waker for the last
+ /// context that called [`poll_write_ready`]; it therefore can only be used
+ /// by a single task at a time (however, [`poll_read_ready`] retains a
+ /// second, independent waker).
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`writable`] is not feasible. Where possible, using [`writable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// [`poll_read_ready`]: method@Self::poll_read_ready
+ /// [`poll_write_ready`]: method@Self::poll_write_ready
+ /// [`writable`]: method@Self::writable
+ pub fn poll_write_ready<'a>(
+ &'a self,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
+ let event = ready!(self.shared.poll_readiness(cx, Direction::Write));
+
+ if !self.handle.is_alive() {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "IO driver has terminated",
+ ))
+ .into();
+ }
+
+ Ok(AsyncFdReadyGuard {
+ async_fd: self,
+ event: Some(event),
+ })
+ .into()
+ }
+
+ async fn readiness(&self, interest: mio::Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> {
+ let event = self.shared.readiness(interest);
+
+ if !self.handle.is_alive() {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "IO driver has terminated",
+ ));
+ }
+
+ let event = event.await;
+ Ok(AsyncFdReadyGuard {
+ async_fd: self,
+ event: Some(event),
+ })
+ }
+
+ /// Waits for the file descriptor to become readable, returning a
+ /// [`AsyncFdReadyGuard`] that must be dropped to resume read-readiness polling.
+ ///
+ /// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
+ pub async fn readable(&self) -> io::Result<AsyncFdReadyGuard<'_, T>> {
+ self.readiness(mio::Interest::READABLE).await
+ }
+
+ /// Waits for the file descriptor to become writable, returning a
+ /// [`AsyncFdReadyGuard`] that must be dropped to resume write-readiness polling.
+ ///
+ /// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
+ pub async fn writable(&self) -> io::Result<AsyncFdReadyGuard<'_, T>> {
+ self.readiness(mio::Interest::WRITABLE).await
+ }
+}
diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs
index cd82b26..a0d8e6f 100644
--- a/src/io/driver/mod.rs
+++ b/src/io/driver/mod.rs
@@ -7,8 +7,8 @@ mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
use crate::park::{Park, Unpark};
-use crate::util::bit;
use crate::util::slab::{self, Slab};
+use crate::{loom::sync::Mutex, util::bit};
use std::fmt;
use std::io;
@@ -25,8 +25,10 @@ pub(crate) struct Driver {
events: Option<mio::Events>,
/// Primary slab handle containing the state for each resource registered
- /// with this driver.
- resources: Slab<ScheduledIo>,
+ /// with this driver. During Drop this is moved into the Inner structure, so
+ /// this is an Option to allow it to be vacated (until Drop this is always
+ /// Some)
+ resources: Option<Slab<ScheduledIo>>,
/// The system event queue
poll: mio::Poll,
@@ -47,6 +49,14 @@ pub(crate) struct ReadyEvent {
}
pub(super) struct Inner {
+ /// Primary slab handle containing the state for each resource registered
+ /// with this driver.
+ ///
+ /// The ownership of this slab is moved into this structure during
+ /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles
+ /// without risking new ones being registered in the meantime.
+ resources: Mutex<Option<Slab<ScheduledIo>>>,
+
/// Registers I/O resources
registry: mio::Registry,
@@ -104,9 +114,10 @@ impl Driver {
Ok(Driver {
tick: 0,
events: Some(mio::Events::with_capacity(1024)),
- resources: slab,
poll,
+ resources: Some(slab),
inner: Arc::new(Inner {
+ resources: Mutex::new(None),
registry,
io_dispatch: allocator,
waker,
@@ -133,7 +144,7 @@ impl Driver {
self.tick = self.tick.wrapping_add(1);
if self.tick == COMPACT_INTERVAL {
- self.resources.compact();
+ self.resources.as_mut().unwrap().compact()
}
let mut events = self.events.take().expect("i/o driver event store missing");
@@ -163,7 +174,9 @@ impl Driver {
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
- let io = match self.resources.get(addr) {
+ let resources = self.resources.as_mut().unwrap();
+
+ let io = match resources.get(addr) {
Some(io) => io,
None => return,
};
@@ -181,12 +194,22 @@ impl Driver {
impl Drop for Driver {
fn drop(&mut self) {
- self.resources.for_each(|io| {
- // If a task is waiting on the I/O resource, notify it. The task
- // will then attempt to use the I/O resource and fail due to the
- // driver being shutdown.
- io.wake(Ready::ALL);
- })
+ (*self.inner.resources.lock()) = self.resources.take();
+ }
+}
+
+impl Drop for Inner {
+ fn drop(&mut self) {
+ let resources = self.resources.lock().take();
+
+ if let Some(mut slab) = resources {
+ slab.for_each(|io| {
+ // If a task is waiting on the I/O resource, notify it. The task
+ // will then attempt to use the I/O resource and fail due to the
+ // driver being shutdown.
+ io.shutdown();
+ });
+ }
}
}
@@ -267,6 +290,12 @@ impl Handle {
pub(super) fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}
+
+ cfg_net_unix! {
+ pub(super) fn is_alive(&self) -> bool {
+ self.inner.strong_count() > 0
+ }
+ }
}
impl Unpark for Handle {
diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs
index b1354a0..3aefb37 100644
--- a/src/io/driver/scheduled_io.rs
+++ b/src/io/driver/scheduled_io.rs
@@ -1,4 +1,4 @@
-use super::{Direction, Ready, ReadyEvent, Tick};
+use super::{Ready, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
@@ -7,6 +7,8 @@ use crate::util::slab::Entry;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};
+use super::Direction;
+
cfg_io_readiness! {
use crate::util::linked_list::{self, LinkedList};
@@ -41,6 +43,9 @@ struct Waiters {
/// Waker used for AsyncWrite
writer: Option<Waker>,
+
+ /// True if this ScheduledIo has been killed due to IO driver shutdown
+ is_shutdown: bool,
}
cfg_io_readiness! {
@@ -121,6 +126,12 @@ impl ScheduledIo {
GENERATION.unpack(self.readiness.load(Acquire))
}
+ /// Invoked when the IO driver is shut down; forces this ScheduledIo into a
+ /// permanently ready state.
+ pub(super) fn shutdown(&self) {
+ self.wake0(Ready::ALL, true)
+ }
+
/// Sets the readiness on this `ScheduledIo` by invoking the given closure on
/// the current value, returning the previous readiness value.
///
@@ -197,6 +208,10 @@ impl ScheduledIo {
/// than 32 wakers to notify, if the stack array fills up, the lock is
/// released, the array is cleared, and the iteration continues.
pub(super) fn wake(&self, ready: Ready) {
+ self.wake0(ready, false);
+ }
+
+ fn wake0(&self, ready: Ready, shutdown: bool) {
const NUM_WAKERS: usize = 32;
let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
@@ -204,6 +219,8 @@ impl ScheduledIo {
let mut waiters = self.waiters.lock();
+ waiters.is_shutdown |= shutdown;
+
// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
@@ -288,7 +305,12 @@ impl ScheduledIo {
// taking the waiters lock
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
- if ready.is_empty() {
+ if waiters.is_shutdown {
+ Poll::Ready(ReadyEvent {
+ tick: TICK.unpack(curr) as u8,
+ ready: direction.mask(),
+ })
+ } else if ready.is_empty() {
Poll::Pending
} else {
Poll::Ready(ReadyEvent {
@@ -401,7 +423,12 @@ cfg_io_readiness! {
let mut waiters = scheduled_io.waiters.lock();
let curr = scheduled_io.readiness.load(SeqCst);
- let ready = Ready::from_usize(READINESS.unpack(curr));
+ let mut ready = Ready::from_usize(READINESS.unpack(curr));
+
+ if waiters.is_shutdown {
+ ready = Ready::ALL;
+ }
+
let ready = ready.intersection(interest);
if !ready.is_empty() {
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 9191bbc..20d9223 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -207,11 +207,21 @@ pub use std::io::{Error, ErrorKind, Result, SeekFrom};
cfg_io_driver! {
pub(crate) mod driver;
+ mod registration;
+
mod poll_evented;
+
#[cfg(not(loom))]
pub(crate) use poll_evented::PollEvented;
+}
- mod registration;
+cfg_net_unix! {
+ mod async_fd;
+
+ pub mod unix {
+ //! Asynchronous IO structures specific to Unix-like operating systems.
+ pub use super::async_fd::{AsyncFd, AsyncFdReadyGuard};
+ }
}
cfg_io_std! {
diff --git a/src/io/read_buf.rs b/src/io/read_buf.rs
index b64d95c..486b69b 100644
--- a/src/io/read_buf.rs
+++ b/src/io/read_buf.rs
@@ -10,8 +10,8 @@ use std::mem::{self, MaybeUninit};
/// This type is a sort of "double cursor". It tracks three regions in the
/// buffer: a region at the beginning of the buffer that has been logically
/// filled with data, a region that has been initialized at some point but not
-/// yet logically filled, and a region at the end that is fully uninitialized.
-/// The filled region is guaranteed to be a subset of the initialized region.
+/// yet logically filled, and a region at the end that may be uninitialized.
+/// The filled region is guaranteed to be a subset of the initialized region.
///
/// In summary, the contents of the buffer can be visualized as:
///
@@ -20,6 +20,10 @@ use std::mem::{self, MaybeUninit};
/// [ filled | unfilled ]
/// [ initialized | uninitialized ]
/// ```
+///
+/// It is undefined behavior to de-initialize any bytes from the uninitialized
+/// region, since it is merely unknown whether this region is uninitialized or
+/// not, and if part of it turns out to be initialized, it must stay initialized.
pub struct ReadBuf<'a> {
buf: &'a mut [MaybeUninit<u8>],
filled: usize,
@@ -115,6 +119,7 @@ impl<'a> ReadBuf<'a> {
/// # Safety
///
/// The caller must not de-initialize portions of the buffer that have already been initialized.
+ /// This includes any bytes in the region marked as uninitialized by `ReadBuf`.
#[inline]
pub unsafe fn unfilled_mut(&mut self) -> &mut [MaybeUninit<u8>] {
&mut self.buf[self.filled..]
diff --git a/src/io/util/async_read_ext.rs b/src/io/util/async_read_ext.rs
index 0ab66c2..1f918f1 100644
--- a/src/io/util/async_read_ext.rs
+++ b/src/io/util/async_read_ext.rs
@@ -185,7 +185,7 @@ cfg_io_util! {
///
/// On a successful read, the number of read bytes is returned. If the
/// supplied buffer is not empty and the function returns `Ok(0)` then
- /// the source as reached an "end-of-file" event.
+ /// the source has reached an "end-of-file" event.
///
/// # Errors
///
diff --git a/src/lib.rs b/src/lib.rs
index 66e266c..b14ae72 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,4 +1,4 @@
-#![doc(html_root_url = "https://docs.rs/tokio/0.3.1")]
+#![doc(html_root_url = "https://docs.rs/tokio/0.3.2")]
#![allow(
clippy::cognitive_complexity,
clippy::large_enum_variant,
@@ -305,7 +305,8 @@
//! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler.
//! - `io-util`: Enables the IO based `Ext` traits.
//! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types.
-//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`.
+//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`,
+//! as well as (on Unix-like systems) `AsyncFd`
//! - `time`: Enables `tokio::time` types and allows the schedulers to enable
//! the built in timer.
//! - `process`: Enables `tokio::process` types.
diff --git a/src/loom/std/mod.rs b/src/loom/std/mod.rs
index 9525286..414ef90 100644
--- a/src/loom/std/mod.rs
+++ b/src/loom/std/mod.rs
@@ -74,7 +74,7 @@ pub(crate) mod sync {
pub(crate) use crate::loom::std::atomic_u8::AtomicU8;
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;
- pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool};
+ pub(crate) use std::sync::atomic::{fence, spin_loop_hint, AtomicBool, Ordering};
}
}
diff --git a/src/loom/std/parking_lot.rs b/src/loom/std/parking_lot.rs
index c03190f..8448bed 100644
--- a/src/loom/std/parking_lot.rs
+++ b/src/loom/std/parking_lot.rs
@@ -43,6 +43,11 @@ impl<T> Mutex<T> {
self.0.try_lock()
}
+ #[inline]
+ pub(crate) fn get_mut(&mut self) -> &mut T {
+ self.0.get_mut()
+ }
+
// Note: Additional methods `is_poisoned` and `into_inner`, can be
// provided here as needed.
}
diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs
index 3f9bca0..118dca2 100644
--- a/src/net/tcp/listener.rs
+++ b/src/net/tcp/listener.rs
@@ -177,7 +177,7 @@ impl TcpListener {
/// current task will be notified by a waker.
///
/// When ready, the most recent task that called `poll_accept` is notified.
- /// The caller is responsble to ensure that `poll_accept` is called from a
+ /// The caller is responsible to ensure that `poll_accept` is called from a
/// single task. Failing to do this could result in tasks hanging.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
loop {
@@ -196,14 +196,14 @@ impl TcpListener {
}
}
- /// Creates a new TCP listener from the standard library's TCP listener.
+ /// Creates new `TcpListener` from a `std::net::TcpListener`.
///
/// This function is intended to be used to wrap a TCP listener from the
/// standard library in the Tokio equivalent. The conversion assumes nothing
/// about the underlying listener; it is left up to the user to set it in
/// non-blocking mode.
///
- /// This API is typically paired with the `net2` crate and the `TcpBuilder`
+ /// This API is typically paired with the `socket2` crate and the `Socket`
/// type to build up and customize a listener before it's shipped off to the
/// backing event loop. This allows configuration of options like
/// `SO_REUSEPORT`, binding to multiple addresses, etc.
diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs
index f90e9a3..204a194 100644
--- a/src/net/tcp/stream.rs
+++ b/src/net/tcp/stream.rs
@@ -145,8 +145,10 @@ impl TcpStream {
/// Creates new `TcpStream` from a `std::net::TcpStream`.
///
- /// This function will convert a TCP stream created by the standard library
- /// to a TCP stream ready to be used with the provided event loop handle.
+ /// This function is intended to be used to wrap a TCP stream from the
+ /// standard library in the Tokio equivalent. The conversion assumes nothing
+ /// about the underlying stream; it is left up to the user to set it in
+ /// non-blocking mode.
///
/// # Examples
///
diff --git a/src/net/udp/socket.rs b/src/net/udp/socket.rs
index d13e92b..ce6f2ca 100644
--- a/src/net/udp/socket.rs
+++ b/src/net/udp/socket.rs
@@ -1,10 +1,11 @@
-use crate::io::PollEvented;
+use crate::io::{PollEvented, ReadBuf};
use crate::net::{to_socket_addrs, ToSocketAddrs};
use std::convert::TryFrom;
use std::fmt;
use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
+use std::task::{Context, Poll};
cfg_net! {
/// A UDP socket
@@ -157,13 +158,14 @@ impl UdpSocket {
Ok(UdpSocket { io })
}
- /// Creates a new `UdpSocket` from the previously bound socket provided.
+ /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`.
///
- /// The socket given will be registered with the event loop that `handle`
- /// is associated with. This function requires that `socket` has previously
- /// been bound to an address to work correctly.
+ /// This function is intended to be used to wrap a UDP socket from the
+ /// standard library in the Tokio equivalent. The conversion assumes nothing
+ /// about the underlying socket; it is left up to the user to set it in
+ /// non-blocking mode.
///
- /// This can be used in conjunction with net2's `UdpBuilder` interface to
+ /// This can be used in conjunction with socket2's `Socket` interface to
/// configure a socket before it's handed off, such as setting options like
/// `reuse_address` or binding to multiple addresses.
///
@@ -272,6 +274,42 @@ impl UdpSocket {
.await
}
+ /// Attempts to send data on the socket to the remote address to which it was previously
+ /// `connect`ed.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address. The future
+ /// will resolve to an error if the socket is not connected.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the send direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not available to write
+ /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`connect`]: method@Self::connect
+ pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
+ loop {
+ let ev = ready!(self.io.poll_write_ready(cx))?;
+
+ match self.io.get_ref().send(buf) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ x => return Poll::Ready(x),
+ }
+ }
+ }
+
/// Try to send data on the socket to the remote address to which it is
/// connected.
///
@@ -304,6 +342,55 @@ impl UdpSocket {
.await
}
+ /// Attempts to receive a single datagram message on the socket from the remote
+ /// address to which it is `connect`ed.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address. This method
+ /// resolves to an error if the socket is not connected.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to read
+ /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`connect`]: method@Self::connect
+ pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
+
+ // Safety: will not read the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+ match self.io.get_ref().recv(b) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok(n) => {
+ // Safety: We trust `recv` to have filled up `n` bytes
+ // in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.advance(n);
+ return Poll::Ready(Ok(()));
+ }
+ }
+ }
+ }
+
/// Returns a future that sends data on the socket to the given address.
/// On success, the future will resolve to the number of bytes written.
///
@@ -337,6 +424,41 @@ impl UdpSocket {
}
}
+ /// Attempts to send data on the socket to a given address.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the send direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to write
+ /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_send_to(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ target: &SocketAddr,
+ ) -> Poll<io::Result<usize>> {
+ loop {
+ let ev = ready!(self.io.poll_write_ready(cx))?;
+
+ match self.io.get_ref().send_to(buf, *target) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ x => return Poll::Ready(x),
+ }
+ }
+ }
+
/// Try to send data on the socket to the given address, but if the send is blocked
/// this will return right away.
///
@@ -403,6 +525,142 @@ impl UdpSocket {
.await
}
+ /// Attempts to receive a single datagram on the socket.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to read
+ /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_recv_from(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<SocketAddr>> {
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
+
+ // Safety: will not read the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+ match self.io.get_ref().recv_from(b) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok((n, addr)) => {
+ // Safety: We trust `recv` to have filled up `n` bytes
+ // in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.advance(n);
+ return Poll::Ready(Ok(addr));
+ }
+ }
+ }
+ }
+
+ /// Receives data from the socket, without removing it from the input queue.
+ /// On success, returns the number of bytes read and the address from whence
+ /// the data came.
+ ///
+ /// # Notes
+ ///
+ /// On Windows, if the data is larger than the buffer specified, the buffer
+ /// is filled with the first part of the data, and peek_from returns the error
+ /// WSAEMSGSIZE(10040). The excess data is lost.
+ /// Make sure to always use a sufficiently large buffer to hold the
+ /// maximum UDP packet size, which can be up to 65536 bytes in size.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::{io, net::SocketAddr};
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
+ /// let mut buf = [0u8; 32];
+ /// let (len, addr) = sock.peek_from(&mut buf).await?;
+ /// println!("peeked {:?} bytes from {:?}", len, addr);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ self.io
+ .async_io(mio::Interest::READABLE, |sock| sock.peek_from(buf))
+ .await
+ }
+
+ /// Receives data from the socket, without removing it from the input queue.
+ /// On success, returns the number of bytes read.
+ ///
+ /// # Notes
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup
+ ///
+ /// On Windows, if the data is larger than the buffer specified, the buffer
+ /// is filled with the first part of the data, and peek returns the error
+ /// WSAEMSGSIZE(10040). The excess data is lost.
+ /// Make sure to always use a sufficiently large buffer to hold the
+ /// maximum UDP packet size, which can be up to 65536 bytes in size.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to read
+ /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_peek_from(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<SocketAddr>> {
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
+
+ // Safety: will not read the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+ match self.io.get_ref().peek_from(b) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
+ Ok((n, addr)) => {
+ // Safety: We trust `recv` to have filled up `n` bytes
+ // in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.advance(n);
+ return Poll::Ready(Ok(addr));
+ }
+ }
+ }
+ }
+
/// Gets the value of the `SO_BROADCAST` option for this socket.
///
/// For more information about this option, see [`set_broadcast`].
@@ -564,6 +822,29 @@ impl UdpSocket {
pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> {
self.io.get_ref().leave_multicast_v6(multiaddr, interface)
}
+
+ /// Returns the value of the `SO_ERROR` option.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UdpSocket;
+ ///
+ /// // Create a socket
+ /// let socket = UdpSocket::bind("0.0.0.0:8080").await?;
+ ///
+ /// if let Ok(Some(err)) = socket.take_error() {
+ /// println!("Got error: {:?}", err);
+ /// }
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.get_ref().take_error()
+ }
}
impl TryFrom<std::net::UdpSocket> for UdpSocket {
diff --git a/src/net/unix/datagram/socket.rs b/src/net/unix/datagram/socket.rs
index 3ae66d1..051c0d1 100644
--- a/src/net/unix/datagram/socket.rs
+++ b/src/net/unix/datagram/socket.rs
@@ -149,11 +149,12 @@ impl UnixDatagram {
Ok((a, b))
}
- /// Consumes a `UnixDatagram` in the standard library and returns a
- /// nonblocking `UnixDatagram` from this crate.
+ /// Creates new `UnixDatagram` from a `std::os::unix::net::UnixDatagram`.
///
- /// The returned datagram will be associated with the given event loop
- /// specified by `handle` and is ready to perform I/O.
+ /// This function is intended to be used to wrap a UnixDatagram from the
+ /// standard library in the Tokio equivalent. The conversion assumes
+ /// nothing about the underlying datagram; it is left up to the user to set
+ /// it in non-blocking mode.
///
/// # Panics
///
diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs
index b272645..8f0d4c0 100644
--- a/src/net/unix/listener.rs
+++ b/src/net/unix/listener.rs
@@ -68,11 +68,12 @@ impl UnixListener {
Ok(UnixListener { io })
}
- /// Consumes a `UnixListener` in the standard library and returns a
- /// nonblocking `UnixListener` from this crate.
+ /// Creates new `UnixListener` from a `std::os::unix::net::UnixListener `.
///
- /// The returned listener will be associated with the given event loop
- /// specified by `handle` and is ready to perform I/O.
+ /// This function is intended to be used to wrap a UnixListener from the
+ /// standard library in the Tokio equivalent. The conversion assumes
+ /// nothing about the underlying listener; it is left up to the user to set
+ /// it in non-blocking mode.
///
/// # Panics
///
@@ -115,7 +116,7 @@ impl UnixListener {
/// the current task will be notified by a waker.
///
/// When ready, the most recent task that called `poll_accept` is notified.
- /// The caller is responsble to ensure that `poll_accept` is called from a
+ /// The caller is responsible to ensure that `poll_accept` is called from a
/// single task. Failing to do this could result in tasks hanging.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
loop {
diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs
index 5138077..2f3dd12 100644
--- a/src/net/unix/stream.rs
+++ b/src/net/unix/stream.rs
@@ -43,11 +43,12 @@ impl UnixStream {
Ok(stream)
}
- /// Consumes a `UnixStream` in the standard library and returns a
- /// nonblocking `UnixStream` from this crate.
+ /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
///
- /// The returned stream will be associated with the given event loop
- /// specified by `handle` and is ready to perform I/O.
+ /// This function is intended to be used to wrap a UnixStream from the
+ /// standard library in the Tokio equivalent. The conversion assumes
+ /// nothing about the underlying stream; it is left up to the user to set
+ /// it in non-blocking mode.
///
/// # Panics
///
diff --git a/src/sync/oneshot.rs b/src/sync/oneshot.rs
index 951ab71..ece9aba 100644
--- a/src/sync/oneshot.rs
+++ b/src/sync/oneshot.rs
@@ -196,54 +196,6 @@ impl<T> Sender<T> {
Ok(())
}
- fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- let inner = self.inner.as_ref().unwrap();
-
- let mut state = State::load(&inner.state, Acquire);
-
- if state.is_closed() {
- coop.made_progress();
- return Poll::Ready(());
- }
-
- if state.is_tx_task_set() {
- let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };
-
- if !will_notify {
- state = State::unset_tx_task(&inner.state);
-
- if state.is_closed() {
- // Set the flag again so that the waker is released in drop
- State::set_tx_task(&inner.state);
- coop.made_progress();
- return Ready(());
- } else {
- unsafe { inner.drop_tx_task() };
- }
- }
- }
-
- if !state.is_tx_task_set() {
- // Attempt to set the task
- unsafe {
- inner.set_tx_task(cx);
- }
-
- // Update the state
- state = State::set_tx_task(&inner.state);
-
- if state.is_closed() {
- coop.made_progress();
- return Ready(());
- }
- }
-
- Pending
- }
-
/// Waits for the associated [`Receiver`] handle to close.
///
/// A [`Receiver`] is closed by either calling [`close`] explicitly or the
@@ -285,8 +237,6 @@ impl<T> Sender<T> {
/// use tokio::sync::oneshot;
/// use tokio::time::{self, Duration};
///
- /// use futures::{select, FutureExt};
- ///
/// async fn compute() -> String {
/// // Complex computation returning a `String`
/// # "hello".to_string()
@@ -297,12 +247,14 @@ impl<T> Sender<T> {
/// let (mut tx, rx) = oneshot::channel();
///
/// tokio::spawn(async move {
- /// select! {
- /// _ = tx.closed().fuse() => {
+ /// tokio::select! {
+ /// _ = tx.closed() => {
/// // The receiver dropped, no need to do any further work
/// }
- /// value = compute().fuse() => {
- /// tx.send(value).unwrap()
+ /// value = compute() => {
+ /// // The send can fail if the channel was closed at the exact same
+ /// // time as when compute() finished, so just ignore the failure.
+ /// let _ = tx.send(value);
/// }
/// }
/// });
@@ -350,6 +302,94 @@ impl<T> Sender<T> {
let state = State::load(&inner.state, Acquire);
state.is_closed()
}
+
+ /// Check whether the oneshot channel has been closed, and if not, schedules the
+ /// `Waker` in the provided `Context` to receive a notification when the channel is
+ /// closed.
+ ///
+ /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
+ /// [`Receiver`] value is dropped.
+ ///
+ /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
+ /// to the most recent call will be scheduled to receive a wakeup.
+ ///
+ /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
+ /// [`close`]: fn@crate::sync::oneshot::Receiver::close
+ ///
+ /// # Return value
+ ///
+ /// This function returns:
+ ///
+ /// * `Poll::Pending` if the channel is still open.
+ /// * `Poll::Ready(())` if the channel is closed.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::oneshot;
+ ///
+ /// use futures::future::poll_fn;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = oneshot::channel::<()>();
+ ///
+ /// tokio::spawn(async move {
+ /// rx.close();
+ /// });
+ ///
+ /// poll_fn(|cx| tx.poll_closed(cx)).await;
+ ///
+ /// println!("the receiver dropped");
+ /// }
+ /// ```
+ pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+
+ let inner = self.inner.as_ref().unwrap();
+
+ let mut state = State::load(&inner.state, Acquire);
+
+ if state.is_closed() {
+ coop.made_progress();
+ return Poll::Ready(());
+ }
+
+ if state.is_tx_task_set() {
+ let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };
+
+ if !will_notify {
+ state = State::unset_tx_task(&inner.state);
+
+ if state.is_closed() {
+ // Set the flag again so that the waker is released in drop
+ State::set_tx_task(&inner.state);
+ coop.made_progress();
+ return Ready(());
+ } else {
+ unsafe { inner.drop_tx_task() };
+ }
+ }
+ }
+
+ if !state.is_tx_task_set() {
+ // Attempt to set the task
+ unsafe {
+ inner.set_tx_task(cx);
+ }
+
+ // Update the state
+ state = State::set_tx_task(&inner.state);
+
+ if state.is_closed() {
+ coop.made_progress();
+ return Ready(());
+ }
+ }
+
+ Pending
+ }
}
impl<T> Drop for Sender<T> {
diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs
index a84c4c1..750765f 100644
--- a/src/sync/rwlock.rs
+++ b/src/sync/rwlock.rs
@@ -375,8 +375,6 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
/// let n = n.downgrade();
/// assert_eq!(*n, 1, "downgrade is atomic");
///
- /// assert_eq!(*lock.read().await, 1, "additional readers can obtain locks");
- ///
/// drop(n);
/// handle.await.unwrap();
/// assert_eq!(*lock.read().await, 2, "second writer obtained write lock");
@@ -389,7 +387,8 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
// Release all but one of the permits held by the write guard
s.release(MAX_READS - 1);
-
+ // NB: Forget to avoid drop impl from being called.
+ mem::forget(self);
RwLockReadGuard {
s,
data,
diff --git a/src/sync/tests/loom_rwlock.rs b/src/sync/tests/loom_rwlock.rs
index 48d06e1..2834a26 100644
--- a/src/sync/tests/loom_rwlock.rs
+++ b/src/sync/tests/loom_rwlock.rs
@@ -6,7 +6,7 @@ use std::sync::Arc;
#[test]
fn concurrent_write() {
- let mut b = loom::model::Builder::new();
+ let b = loom::model::Builder::new();
b.check(|| {
let rwlock = Arc::new(RwLock::<u32>::new(0));
@@ -37,7 +37,7 @@ fn concurrent_write() {
#[test]
fn concurrent_read_write() {
- let mut b = loom::model::Builder::new();
+ let b = loom::model::Builder::new();
b.check(|| {
let rwlock = Arc::new(RwLock::<u32>::new(0));
@@ -76,3 +76,24 @@ fn concurrent_read_write() {
assert_eq!(10, *guard);
});
}
+#[test]
+fn downgrade() {
+ loom::model(|| {
+ let lock = Arc::new(RwLock::new(1));
+
+ let n = block_on(lock.write());
+
+ let cloned_lock = lock.clone();
+ let handle = thread::spawn(move || {
+ let mut guard = block_on(cloned_lock.write());
+ *guard = 2;
+ });
+
+ let n = n.downgrade();
+ assert_eq!(*n, 1);
+
+ drop(n);
+ handle.join().unwrap();
+ assert_eq!(*block_on(lock.read()), 2);
+ });
+}
diff --git a/src/sync/tests/mod.rs b/src/sync/tests/mod.rs
index a78be6f..c5d5601 100644
--- a/src/sync/tests/mod.rs
+++ b/src/sync/tests/mod.rs
@@ -12,4 +12,5 @@ cfg_loom! {
mod loom_oneshot;
mod loom_semaphore_batch;
mod loom_watch;
+ mod loom_rwlock;
}
diff --git a/tests/io_async_fd.rs b/tests/io_async_fd.rs
new file mode 100644
index 0000000..0303eff
--- /dev/null
+++ b/tests/io_async_fd.rs
@@ -0,0 +1,604 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(unix, feature = "full"))]
+
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+};
+use std::time::Duration;
+use std::{
+ future::Future,
+ io::{self, ErrorKind, Read, Write},
+ task::{Context, Waker},
+};
+
+use nix::errno::Errno;
+use nix::unistd::{close, read, write};
+
+use futures::{poll, FutureExt};
+
+use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
+use tokio_test::{assert_err, assert_pending};
+
+struct TestWaker {
+ inner: Arc<TestWakerInner>,
+ waker: Waker,
+}
+
+#[derive(Default)]
+struct TestWakerInner {
+ awoken: AtomicBool,
+}
+
+impl futures::task::ArcWake for TestWakerInner {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.awoken.store(true, Ordering::SeqCst);
+ }
+}
+
+impl TestWaker {
+ fn new() -> Self {
+ let inner: Arc<TestWakerInner> = Default::default();
+
+ Self {
+ inner: inner.clone(),
+ waker: futures::task::waker(inner),
+ }
+ }
+
+ fn awoken(&self) -> bool {
+ self.inner.awoken.swap(false, Ordering::SeqCst)
+ }
+
+ fn context(&self) -> Context<'_> {
+ Context::from_waker(&self.waker)
+ }
+}
+
+fn is_blocking(e: &nix::Error) -> bool {
+ Some(Errno::EAGAIN) == e.as_errno()
+}
+
+#[derive(Debug)]
+struct FileDescriptor {
+ fd: RawFd,
+}
+
+impl AsRawFd for FileDescriptor {
+ fn as_raw_fd(&self) -> RawFd {
+ self.fd
+ }
+}
+
+impl Read for &FileDescriptor {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ match read(self.fd, buf) {
+ Ok(n) => Ok(n),
+ Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
+ Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
+ }
+ }
+}
+
+impl Read for FileDescriptor {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ (self as &Self).read(buf)
+ }
+}
+
+impl Write for &FileDescriptor {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ match write(self.fd, buf) {
+ Ok(n) => Ok(n),
+ Err(e) if is_blocking(&e) => Err(ErrorKind::WouldBlock.into()),
+ Err(e) => Err(io::Error::new(ErrorKind::Other, e)),
+ }
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl Write for FileDescriptor {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ (self as &Self).write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ (self as &Self).flush()
+ }
+}
+
+impl Drop for FileDescriptor {
+ fn drop(&mut self) {
+ let _ = close(self.fd);
+ }
+}
+
+fn set_nonblocking(fd: RawFd) {
+ use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
+
+ let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
+
+ if flags < 0 {
+ panic!(
+ "bad return value from fcntl(F_GETFL): {} ({:?})",
+ flags,
+ nix::Error::last()
+ );
+ }
+
+ let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
+
+ nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
+}
+
+fn socketpair() -> (FileDescriptor, FileDescriptor) {
+ use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
+
+ let (fd_a, fd_b) = socket::socketpair(
+ AddressFamily::Unix,
+ SockType::Stream,
+ None,
+ SockFlag::empty(),
+ )
+ .expect("socketpair");
+ let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
+
+ set_nonblocking(fds.0.fd);
+ set_nonblocking(fds.1.fd);
+
+ fds
+}
+
+fn drain(mut fd: &FileDescriptor) {
+ let mut buf = [0u8; 512];
+
+ loop {
+ match fd.read(&mut buf[..]) {
+ Err(e) if e.kind() == ErrorKind::WouldBlock => break,
+ Ok(0) => panic!("unexpected EOF"),
+ Err(e) => panic!("unexpected error: {:?}", e),
+ Ok(_) => continue,
+ }
+ }
+}
+
+#[tokio::test]
+async fn initially_writable() {
+ let (a, b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+ let afd_b = AsyncFd::new(b).unwrap();
+
+ afd_a.writable().await.unwrap().clear_ready();
+ afd_b.writable().await.unwrap().clear_ready();
+
+ futures::select_biased! {
+ _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {},
+ _ = afd_a.readable().fuse() => panic!("Unexpected readable state"),
+ _ = afd_b.readable().fuse() => panic!("Unexpected readable state"),
+ }
+}
+
+#[tokio::test]
+async fn reset_readable() {
+ let (a, mut b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = readable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ b.write_all(b"0").unwrap();
+
+ let mut guard = readable.await.unwrap();
+
+ guard.with_io(|| afd_a.get_ref().read(&mut [0])).unwrap();
+
+ // `a` is not readable, but the reactor still thinks it is
+ // (because we have not observed a not-ready error yet)
+ afd_a.readable().await.unwrap().retain_ready();
+
+ // Explicitly clear the ready state
+ guard.clear_ready();
+
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = readable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ b.write_all(b"0").unwrap();
+
+ // We can observe the new readable event
+ afd_a.readable().await.unwrap().clear_ready();
+}
+
+#[tokio::test]
+async fn reset_writable() {
+ let (a, b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ let mut guard = afd_a.writable().await.unwrap();
+
+ // Write until we get a WouldBlock. This also clears the ready state.
+ loop {
+ if let Err(e) = guard.with_io(|| afd_a.get_ref().write(&[0; 512][..])) {
+ assert_eq!(ErrorKind::WouldBlock, e.kind());
+ break;
+ }
+ }
+
+ // Writable state should be cleared now.
+ let writable = afd_a.writable();
+ tokio::pin!(writable);
+
+ tokio::select! {
+ _ = writable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ // Read from the other side; we should become writable now.
+ drain(&b);
+
+ let _ = writable.await.unwrap();
+}
+
+#[derive(Debug)]
+struct ArcFd<T>(Arc<T>);
+impl<T: AsRawFd> AsRawFd for ArcFd<T> {
+ fn as_raw_fd(&self) -> RawFd {
+ self.0.as_raw_fd()
+ }
+}
+
+#[tokio::test]
+async fn drop_closes() {
+ let (a, mut b) = socketpair();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ assert_eq!(
+ ErrorKind::WouldBlock,
+ b.read(&mut [0]).err().unwrap().kind()
+ );
+
+ std::mem::drop(afd_a);
+
+ assert_eq!(0, b.read(&mut [0]).unwrap());
+
+ // into_inner does not close the fd
+
+ let (a, mut b) = socketpair();
+ let afd_a = AsyncFd::new(a).unwrap();
+ let _a: FileDescriptor = afd_a.into_inner();
+
+ assert_eq!(
+ ErrorKind::WouldBlock,
+ b.read(&mut [0]).err().unwrap().kind()
+ );
+
+ // Drop closure behavior is delegated to the inner object
+ let (a, mut b) = socketpair();
+ let arc_fd = Arc::new(a);
+ let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
+ std::mem::drop(afd_a);
+
+ assert_eq!(
+ ErrorKind::WouldBlock,
+ b.read(&mut [0]).err().unwrap().kind()
+ );
+
+ std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
+}
+
+#[tokio::test]
+async fn with_poll() {
+ use std::task::Poll;
+
+ let (a, mut b) = socketpair();
+
+ b.write_all(b"0").unwrap();
+
+ let afd_a = AsyncFd::new(a).unwrap();
+
+ let mut guard = afd_a.readable().await.unwrap();
+
+ afd_a.get_ref().read_exact(&mut [0]).unwrap();
+
+ // Should not clear the readable state
+ let _ = guard.with_poll(|| Poll::Ready(()));
+
+ // Still readable...
+ let _ = afd_a.readable().await.unwrap();
+
+ // Should clear the readable state
+ let _ = guard.with_poll(|| Poll::Pending::<()>);
+
+ // Assert not readable
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = readable.as_mut() => panic!(),
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {}
+ }
+
+ // Write something down b again and make sure we're reawoken
+ b.write_all(b"0").unwrap();
+ let _ = readable.await.unwrap();
+}
+
+#[tokio::test]
+async fn multiple_waiters() {
+ let (a, mut b) = socketpair();
+ let afd_a = Arc::new(AsyncFd::new(a).unwrap());
+
+ let barrier = Arc::new(tokio::sync::Barrier::new(11));
+
+ let mut tasks = Vec::new();
+ for _ in 0..10 {
+ let afd_a = afd_a.clone();
+ let barrier = barrier.clone();
+
+ let f = async move {
+ let notify_barrier = async {
+ barrier.wait().await;
+ futures::future::pending::<()>().await;
+ };
+
+ futures::select_biased! {
+ guard = afd_a.readable().fuse() => {
+ tokio::task::yield_now().await;
+ guard.unwrap().clear_ready()
+ },
+ _ = notify_barrier.fuse() => unreachable!(),
+ }
+
+ std::mem::drop(afd_a);
+ };
+
+ tasks.push(tokio::spawn(f));
+ }
+
+ let mut all_tasks = futures::future::try_join_all(tasks);
+
+ tokio::select! {
+ r = std::pin::Pin::new(&mut all_tasks) => {
+ r.unwrap(); // propagate panic
+ panic!("Tasks exited unexpectedly")
+ },
+ _ = barrier.wait() => {}
+ };
+
+ b.write_all(b"0").unwrap();
+
+ all_tasks.await.unwrap();
+}
+
+#[tokio::test]
+async fn poll_fns() {
+ let (a, b) = socketpair();
+ let afd_a = Arc::new(AsyncFd::new(a).unwrap());
+ let afd_b = Arc::new(AsyncFd::new(b).unwrap());
+
+ // Fill up the write side of A
+ while afd_a.get_ref().write(&[0; 512]).is_ok() {}
+
+ let waker = TestWaker::new();
+
+ assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
+
+ let afd_a_2 = afd_a.clone();
+ let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
+ let barrier_clone = r_barrier.clone();
+
+ let read_fut = tokio::spawn(async move {
+ // Move waker onto this task first
+ assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
+ .as_ref()
+ .poll_read_ready(cx))));
+ barrier_clone.wait().await;
+
+ let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
+ });
+
+ let afd_a_2 = afd_a.clone();
+ let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
+ let barrier_clone = w_barrier.clone();
+
+ let mut write_fut = tokio::spawn(async move {
+ // Move waker onto this task first
+ assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
+ .as_ref()
+ .poll_write_ready(cx))));
+ barrier_clone.wait().await;
+
+ let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
+ });
+
+ r_barrier.wait().await;
+ w_barrier.wait().await;
+
+ let readable = afd_a.readable();
+ tokio::pin!(readable);
+
+ tokio::select! {
+ _ = &mut readable => unreachable!(),
+ _ = tokio::task::yield_now() => {}
+ }
+
+ // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
+ afd_b.get_ref().write_all(b"0").unwrap();
+
+ let _ = tokio::join!(readable, read_fut);
+
+ // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
+ assert!(!waker.awoken());
+
+ // The writable side should not be awoken
+ tokio::select! {
+ _ = &mut write_fut => unreachable!(),
+ _ = tokio::time::sleep(Duration::from_millis(5)) => {}
+ }
+
+ // Make it writable now
+ drain(afd_b.get_ref());
+
+ // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
+ let _ = write_fut.await;
+}
+
+fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
+ let mut pinned = Box::pin(f);
+
+ assert_pending!(pinned
+ .as_mut()
+ .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
+
+ pinned
+}
+
+fn rt() -> tokio::runtime::Runtime {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+}
+
+#[test]
+fn driver_shutdown_wakes_currently_pending() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ let readable = assert_pending(afd_a.readable());
+
+ std::mem::drop(rt);
+
+ // Being awoken by a rt drop does not return an error, currently...
+ let _ = futures::executor::block_on(readable).unwrap();
+
+ // However, attempting to initiate a readiness wait when the rt is dropped is an error
+ assert_err!(futures::executor::block_on(afd_a.readable()));
+}
+
+#[test]
+fn driver_shutdown_wakes_future_pending() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ std::mem::drop(rt);
+
+ assert_err!(futures::executor::block_on(afd_a.readable()));
+}
+
+#[test]
+fn driver_shutdown_wakes_pending_race() {
+ // TODO: make this a loom test
+ for _ in 0..100 {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ let _ = std::thread::spawn(move || std::mem::drop(rt));
+
+ // This may or may not return an error (but will be awoken)
+ let _ = futures::executor::block_on(afd_a.readable());
+
+ // However retrying will always return an error
+ assert_err!(futures::executor::block_on(afd_a.readable()));
+ }
+}
+
+async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
+ futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
+}
+
+async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
+ futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
+}
+
+#[test]
+fn driver_shutdown_wakes_currently_pending_polls() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
+
+ let readable = assert_pending(poll_readable(&afd_a));
+ let writable = assert_pending(poll_writable(&afd_a));
+
+ std::mem::drop(rt);
+
+ // Attempting to poll readiness when the rt is dropped is an error
+ assert_err!(futures::executor::block_on(readable));
+ assert_err!(futures::executor::block_on(writable));
+}
+
+#[test]
+fn driver_shutdown_wakes_poll() {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ std::mem::drop(rt);
+
+ assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
+ assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
+}
+
+#[test]
+fn driver_shutdown_wakes_poll_race() {
+ // TODO: make this a loom test
+ for _ in 0..100 {
+ let rt = rt();
+
+ let (a, _b) = socketpair();
+ let afd_a = {
+ let _enter = rt.enter();
+ AsyncFd::new(a).unwrap()
+ };
+
+ while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
+
+ let _ = std::thread::spawn(move || std::mem::drop(rt));
+
+ // The poll variants will always return an error in this case
+ assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
+ assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
+ }
+}
diff --git a/tests/udp.rs b/tests/udp.rs
index 0bea83a..8b79cb8 100644
--- a/tests/udp.rs
+++ b/tests/udp.rs
@@ -1,8 +1,9 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
+use futures::future::poll_fn;
use std::sync::Arc;
-use tokio::net::UdpSocket;
+use tokio::{io::ReadBuf, net::UdpSocket};
const MSG: &[u8] = b"hello";
const MSG_LEN: usize = MSG.len();
@@ -25,6 +26,24 @@ async fn send_recv() -> std::io::Result<()> {
}
#[tokio::test]
+async fn send_recv_poll() -> std::io::Result<()> {
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ sender.connect(receiver.local_addr()?).await?;
+ receiver.connect(sender.local_addr()?).await?;
+
+ poll_fn(|cx| sender.poll_send(cx, MSG)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), MSG);
+ Ok(())
+}
+
+#[tokio::test]
async fn send_to_recv_from() -> std::io::Result<()> {
let sender = UdpSocket::bind("127.0.0.1:0").await?;
let receiver = UdpSocket::bind("127.0.0.1:0").await?;
@@ -41,6 +60,79 @@ async fn send_to_recv_from() -> std::io::Result<()> {
}
#[tokio::test]
+async fn send_to_recv_from_poll() -> std::io::Result<()> {
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ let receiver_addr = receiver.local_addr()?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), MSG);
+ assert_eq!(addr, sender.local_addr()?);
+ Ok(())
+}
+
+#[tokio::test]
+async fn send_to_peek_from() -> std::io::Result<()> {
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ let receiver_addr = receiver.local_addr()?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?;
+
+ // peek
+ let mut recv_buf = [0u8; 32];
+ let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
+ assert_eq!(&recv_buf[..n], MSG);
+ assert_eq!(addr, sender.local_addr()?);
+
+ // peek
+ let mut recv_buf = [0u8; 32];
+ let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
+ assert_eq!(&recv_buf[..n], MSG);
+ assert_eq!(addr, sender.local_addr()?);
+
+ let mut recv_buf = [0u8; 32];
+ let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
+ assert_eq!(&recv_buf[..n], MSG);
+ assert_eq!(addr, sender.local_addr()?);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn send_to_peek_from_poll() -> std::io::Result<()> {
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ let receiver_addr = receiver.local_addr()?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, &receiver_addr)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), MSG);
+ assert_eq!(addr, sender.local_addr()?);
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), MSG);
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+
+ poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
+ assert_eq!(read.filled(), MSG);
+ Ok(())
+}
+
+#[tokio::test]
async fn split() -> std::io::Result<()> {
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let s = Arc::new(socket);
@@ -88,6 +180,43 @@ async fn split_chan() -> std::io::Result<()> {
Ok(())
}
+#[tokio::test]
+async fn split_chan_poll() -> std::io::Result<()> {
+ // setup UdpSocket that will echo all sent items
+ let socket = UdpSocket::bind("127.0.0.1:0").await?;
+ let addr = socket.local_addr().unwrap();
+ let s = Arc::new(socket);
+ let r = s.clone();
+
+ let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
+ tokio::spawn(async move {
+ while let Some((bytes, addr)) = rx.recv().await {
+ poll_fn(|cx| s.poll_send_to(cx, &bytes, &addr))
+ .await
+ .unwrap();
+ }
+ });
+
+ tokio::spawn(async move {
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ loop {
+ let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap();
+ tx.send((read.filled().to_vec(), addr)).await.unwrap();
+ }
+ });
+
+ // test that we can send a value and get back some response
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, &addr)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?;
+ assert_eq!(read.filled(), MSG);
+ Ok(())
+}
+
// # Note
//
// This test is purposely written such that each time `sender` sends data on