aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-10-08 19:49:06 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-10-08 19:49:06 +0000
commitd407740429530ce63c8f55ed9ea2e40de0fc1229 (patch)
tree71a18fec0599d209bd7c1b95140dc75566fa3788
parentb4663de2acba9e8f767216055ba814d0b8d5100d (diff)
parent7c69dbe097f4ba0a1a0aa9e67ed50294b68fa3dc (diff)
downloadtokio-d407740429530ce63c8f55ed9ea2e40de0fc1229.tar.gz
Merge "Upgrade rust/crates/tokio to 1.12.0" am: e16ac718df am: 7c69dbe097
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio/+/1833327 Change-Id: I75e8e8b239472bf227fc1464b7d99b917791ffa7
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp2
-rw-r--r--CHANGELOG.md77
-rw-r--r--Cargo.toml8
-rw-r--r--Cargo.toml.orig8
-rw-r--r--METADATA8
-rw-r--r--README.md2
-rw-r--r--patches/test_fix.patch21
-rw-r--r--src/doc/mod.rs1
-rw-r--r--src/io/bsd/poll_aio.rs195
-rw-r--r--src/io/driver/interest.rs20
-rw-r--r--src/io/driver/mod.rs1
-rw-r--r--src/io/driver/ready.rs11
-rw-r--r--src/io/driver/scheduled_io.rs27
-rw-r--r--src/io/mod.rs9
-rw-r--r--src/io/util/async_seek_ext.rs10
-rw-r--r--src/io/util/fill_buf.rs13
-rw-r--r--src/lib.rs18
-rw-r--r--src/loom/std/mod.rs15
-rw-r--r--src/macros/cfg.rs33
-rw-r--r--src/process/mod.rs57
-rw-r--r--src/process/unix/mod.rs19
-rw-r--r--src/process/windows.rs18
-rw-r--r--src/runtime/basic_scheduler.rs47
-rw-r--r--src/runtime/builder.rs131
-rw-r--r--src/runtime/handle.rs24
-rw-r--r--src/runtime/mod.rs11
-rw-r--r--src/runtime/queue.rs16
-rw-r--r--src/runtime/spawner.rs42
-rw-r--r--src/runtime/stats/mock.rs27
-rw-r--r--src/runtime/stats/mod.rs17
-rw-r--r--src/runtime/stats/stats.rs97
-rw-r--r--src/runtime/task/join.rs3
-rw-r--r--src/runtime/tests/loom_queue.rs16
-rw-r--r--src/runtime/tests/queue.rs11
-rw-r--r--src/runtime/thread_pool/mod.rs16
-rw-r--r--src/runtime/thread_pool/worker.rs78
-rw-r--r--src/sync/batch_semaphore.rs16
-rw-r--r--src/sync/mpsc/block.rs6
-rw-r--r--src/sync/mpsc/bounded.rs64
-rw-r--r--src/sync/mpsc/chan.rs48
-rw-r--r--src/sync/mpsc/error.rs24
-rw-r--r--src/sync/mpsc/list.rs42
-rw-r--r--src/sync/mpsc/unbounded.rs54
-rw-r--r--src/sync/notify.rs21
-rw-r--r--src/sync/tests/loom_mpsc.rs56
-rw-r--r--src/sync/watch.rs95
-rw-r--r--src/task/yield_now.rs77
-rw-r--r--src/time/clock.rs36
-rw-r--r--src/time/driver/mod.rs12
-rw-r--r--src/time/driver/sleep.rs26
-rw-r--r--src/util/mod.rs23
-rw-r--r--src/util/trace.rs8
-rw-r--r--src/util/wake_list.rs53
-rw-r--r--tests/fs_file.rs20
-rw-r--r--tests/io_async_fd.rs18
-rw-r--r--tests/io_fill_buf.rs34
-rw-r--r--tests/io_poll_aio.rs375
-rw-r--r--tests/process_kill_on_drop.rs12
-rw-r--r--tests/sync_mpsc.rs119
-rw-r--r--tests/sync_watch.rs15
61 files changed, 2111 insertions, 254 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 755a957..0577e49 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "dd060b16f54ce196b3891042030dedd3abc017c4"
+ "sha1": "1ed89aa5cf7e5b9524b9e08a02030d222fd63417"
}
}
diff --git a/Android.bp b/Android.bp
index 2e08e9e..a89c568 100644
--- a/Android.bp
+++ b/Android.bp
@@ -23,7 +23,7 @@ rust_library {
host_supported: true,
crate_name: "tokio",
cargo_env_compat: true,
- cargo_pkg_version: "1.10.1",
+ cargo_pkg_version: "1.12.0",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3b41957..16e44e5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,80 @@
+# 1.12.0 (September 21, 2021)
+
+### Fixed
+
+- mpsc: ensure `try_reserve` error is consistent with `try_send` ([#4119])
+- mpsc: use `spin_loop_hint` instead of `yield_now` ([#4115])
+- sync: make `SendError` field public ([#4097])
+
+### Added
+
+- io: add POSIX AIO on FreeBSD ([#4054])
+- io: add convenience method `AsyncSeekExt::rewind` ([#4107])
+- runtime: add tracing span for `block_on` futures ([#4094])
+- runtime: callback when a worker parks and unparks ([#4070])
+- sync: implement `try_recv` for mpsc channels ([#4113])
+
+### Changed
+
+- macros: run runtime inside `LocalSet` when using macro ([#4027])
+
+### Documented
+
+- docs: clarify CPU-bound tasks on Tokio ([#4105])
+- mpsc: document spurious failures on `poll_recv` ([#4117])
+- mpsc: document that `PollSender` impls `Sink` ([#4110])
+- task: document non-guarantees of `yield_now` ([#4091])
+- time: document paused time details better ([#4061], [#4103])
+
+[#4027]: https://github.com/tokio-rs/tokio/pull/4027
+[#4054]: https://github.com/tokio-rs/tokio/pull/4054
+[#4061]: https://github.com/tokio-rs/tokio/pull/4061
+[#4070]: https://github.com/tokio-rs/tokio/pull/4070
+[#4091]: https://github.com/tokio-rs/tokio/pull/4091
+[#4094]: https://github.com/tokio-rs/tokio/pull/4094
+[#4097]: https://github.com/tokio-rs/tokio/pull/4097
+[#4103]: https://github.com/tokio-rs/tokio/pull/4103
+[#4105]: https://github.com/tokio-rs/tokio/pull/4105
+[#4107]: https://github.com/tokio-rs/tokio/pull/4107
+[#4110]: https://github.com/tokio-rs/tokio/pull/4110
+[#4113]: https://github.com/tokio-rs/tokio/pull/4113
+[#4115]: https://github.com/tokio-rs/tokio/pull/4115
+[#4117]: https://github.com/tokio-rs/tokio/pull/4117
+[#4119]: https://github.com/tokio-rs/tokio/pull/4119
+
+# 1.11.0 (August 31, 2021)
+
+### Fixed
+
+ - time: don't panic when Instant is not monotonic ([#4044])
+ - io: fix panic in `fill_buf` by not calling `poll_fill_buf` twice ([#4084])
+
+### Added
+
+ - watch: add `watch::Sender::subscribe` ([#3800])
+ - process: add `from_std` to `ChildStd*` ([#4045])
+ - stats: initial work on runtime stats ([#4043])
+
+### Changed
+
+ - tracing: change span naming to new console convention ([#4042])
+ - io: speed-up waking by using uninitialized array ([#4055], [#4071], [#4075])
+
+### Documented
+
+ - time: make Sleep examples easier to find ([#4040])
+
+[#3800]: https://github.com/tokio-rs/tokio/pull/3800
+[#4040]: https://github.com/tokio-rs/tokio/pull/4040
+[#4042]: https://github.com/tokio-rs/tokio/pull/4042
+[#4043]: https://github.com/tokio-rs/tokio/pull/4043
+[#4044]: https://github.com/tokio-rs/tokio/pull/4044
+[#4045]: https://github.com/tokio-rs/tokio/pull/4045
+[#4055]: https://github.com/tokio-rs/tokio/pull/4055
+[#4071]: https://github.com/tokio-rs/tokio/pull/4071
+[#4075]: https://github.com/tokio-rs/tokio/pull/4075
+[#4084]: https://github.com/tokio-rs/tokio/pull/4084
+
# 1.10.1 (August 24, 2021)
### Fixed
diff --git a/Cargo.toml b/Cargo.toml
index 768583f..c09b22a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
[package]
edition = "2018"
name = "tokio"
-version = "1.10.1"
+version = "1.12.0"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio/1.10.0/tokio/"
+documentation = "https://docs.rs/tokio/1.12.0/tokio/"
readme = "README.md"
keywords = ["io", "async", "non-blocking", "futures"]
categories = ["asynchronous", "network-programming"]
@@ -101,12 +101,16 @@ process = ["bytes", "once_cell", "libc", "mio/os-poll", "mio/os-util", "mio/uds"
rt = []
rt-multi-thread = ["num_cpus", "rt"]
signal = ["once_cell", "libc", "mio/os-poll", "mio/uds", "mio/os-util", "signal-hook-registry", "winapi/consoleapi"]
+stats = []
sync = []
test-util = ["rt", "sync", "time"]
time = []
[target."cfg(loom)".dev-dependencies.loom]
version = "0.5"
features = ["futures", "checkpoint"]
+[target."cfg(target_os = \"freebsd\")".dev-dependencies.mio-aio]
+version = "0.6.0"
+features = ["tokio"]
[target."cfg(tokio_unstable)".dependencies.tracing]
version = "0.1.21"
features = ["std"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 90455fb..d2e4696 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -7,12 +7,12 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.0.x" git tag.
-version = "1.10.1"
+version = "1.12.0"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
readme = "README.md"
-documentation = "https://docs.rs/tokio/1.10.0/tokio/"
+documentation = "https://docs.rs/tokio/1.12.0/tokio/"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
description = """
@@ -47,6 +47,7 @@ io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
macros = ["tokio-macros"]
+stats = []
net = [
"libc",
"mio/os-poll",
@@ -130,6 +131,9 @@ tempfile = "3.1.0"
async-stream = "0.3"
socket2 = "0.4"
+[target.'cfg(target_os = "freebsd")'.dev-dependencies]
+mio-aio = { version = "0.6.0", features = ["tokio"] }
+
[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.5", features = ["futures", "checkpoint"] }
diff --git a/METADATA b/METADATA
index 4a93f8b..cd6df63 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio/tokio-1.10.1.crate"
+ value: "https://static.crates.io/crates/tokio/tokio-1.12.0.crate"
}
- version: "1.10.1"
+ version: "1.12.0"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 8
- day: 25
+ month: 9
+ day: 30
}
}
diff --git a/README.md b/README.md
index 80f75be..4d99c88 100644
--- a/README.md
+++ b/README.md
@@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:
```toml
[dependencies]
-tokio = { version = "1.10.1", features = ["full"] }
+tokio = { version = "1.12.0", features = ["full"] }
```
Then, on your main.rs:
diff --git a/patches/test_fix.patch b/patches/test_fix.patch
new file mode 100644
index 0000000..efc8c27
--- /dev/null
+++ b/patches/test_fix.patch
@@ -0,0 +1,21 @@
+diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs
+index a70f49b81a..f8a35d0ede 100644
+--- a/tests/task_local_set.rs
++++ b/tests/task_local_set.rs
+@@ -16,16 +16,6 @@ use std::sync::atomic::Ordering::{self, SeqCst};
+ use std::sync::atomic::{AtomicBool, AtomicUsize};
+ use std::time::Duration;
+
+-#[tokio::test(flavor = "current_thread")]
+-async fn localset_implicit_current_thread() {
+- task::spawn_local(async {}).await.unwrap();
+-}
+-
+-#[tokio::test(flavor = "multi_thread")]
+-async fn localset_implicit_multi_thread() {
+- task::spawn_local(async {}).await.unwrap();
+-}
+-
+ #[tokio::test(flavor = "current_thread")]
+ async fn local_basic_scheduler() {
+ LocalSet::new() \ No newline at end of file
diff --git a/src/doc/mod.rs b/src/doc/mod.rs
index 12c2247..3a94934 100644
--- a/src/doc/mod.rs
+++ b/src/doc/mod.rs
@@ -17,6 +17,7 @@
/// will ever accidentally use it.
///
/// [`never` type]: https://doc.rust-lang.org/std/primitive.never.html
+#[derive(Debug)]
pub enum NotDefinedHere {}
pub mod os;
diff --git a/src/io/bsd/poll_aio.rs b/src/io/bsd/poll_aio.rs
new file mode 100644
index 0000000..a765d76
--- /dev/null
+++ b/src/io/bsd/poll_aio.rs
@@ -0,0 +1,195 @@
+//! Use POSIX AIO futures with Tokio
+
+use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
+use mio::event::Source;
+use mio::Registry;
+use mio::Token;
+use std::fmt;
+use std::io;
+use std::ops::{Deref, DerefMut};
+use std::os::unix::io::AsRawFd;
+use std::os::unix::prelude::RawFd;
+use std::task::{Context, Poll};
+
+/// Like [`mio::event::Source`], but for POSIX AIO only.
+///
+/// Tokio's consumer must pass an implementor of this trait to create a
+/// [`Aio`] object.
+pub trait AioSource {
+ /// Register this AIO event source with Tokio's reactor
+ fn register(&mut self, kq: RawFd, token: usize);
+
+ /// Deregister this AIO event source with Tokio's reactor
+ fn deregister(&mut self);
+}
+
+/// Wrap the user's AioSource in order to implement mio::event::Source, which
+/// is what the rest of the crate wants.
+struct MioSource<T>(T);
+
+impl<T: AioSource> Source for MioSource<T> {
+ fn register(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: mio::Interest,
+ ) -> io::Result<()> {
+ assert!(interests.is_aio() || interests.is_lio());
+ self.0.register(registry.as_raw_fd(), usize::from(token));
+ Ok(())
+ }
+
+ fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
+ self.0.deregister();
+ Ok(())
+ }
+
+ fn reregister(
+ &mut self,
+ registry: &Registry,
+ token: Token,
+ interests: mio::Interest,
+ ) -> io::Result<()> {
+ assert!(interests.is_aio() || interests.is_lio());
+ self.0.register(registry.as_raw_fd(), usize::from(token));
+ Ok(())
+ }
+}
+
+/// Associates a POSIX AIO control block with the reactor that drives it.
+///
+/// `Aio`'s wrapped type must implement [`AioSource`] to be driven
+/// by the reactor.
+///
+/// The wrapped source may be accessed through the `Aio` via the `Deref` and
+/// `DerefMut` traits.
+///
+/// ## Clearing readiness
+///
+/// If [`Aio::poll_ready`] returns ready, but the consumer determines that the
+/// Source is not completely ready and must return to the Pending state,
+/// [`Aio::clear_ready`] may be used. This can be useful with
+/// [`lio_listio`], which may generate a kevent when only a portion of the
+/// operations have completed.
+///
+/// ## Platforms
+///
+/// Only FreeBSD implements POSIX AIO with kqueue notification, so
+/// `Aio` is only available for that operating system.
+///
+/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
+// Note: Unlike every other kqueue event source, POSIX AIO registers events not
+// via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
+// aio_write, etc. It needs the kqueue's file descriptor to do that. So
+// AsyncFd can't be used for POSIX AIO.
+//
+// Note that Aio doesn't implement Drop. There's no need. Unlike other
+// kqueue sources, simply dropping the object effectively deregisters it.
+pub struct Aio<E> {
+ io: MioSource<E>,
+ registration: Registration,
+}
+
+// ===== impl Aio =====
+
+impl<E: AioSource> Aio<E> {
+ /// Creates a new `Aio` suitable for use with POSIX AIO functions.
+ ///
+ /// It will be associated with the default reactor. The runtime is usually
+ /// set implicitly when this function is called from a future driven by a
+ /// Tokio runtime, otherwise runtime can be set explicitly with
+ /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn new_for_aio(io: E) -> io::Result<Self> {
+ Self::new_with_interest(io, Interest::AIO)
+ }
+
+ /// Creates a new `Aio` suitable for use with [`lio_listio`].
+ ///
+ /// It will be associated with the default reactor. The runtime is usually
+ /// set implicitly when this function is called from a future driven by a
+ /// Tokio runtime, otherwise runtime can be set explicitly with
+ /// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ ///
+ /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
+ pub fn new_for_lio(io: E) -> io::Result<Self> {
+ Self::new_with_interest(io, Interest::LIO)
+ }
+
+ fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
+ let mut io = MioSource(io);
+ let handle = Handle::current();
+ let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
+ Ok(Self { io, registration })
+ }
+
+ /// Indicates to Tokio that the source is no longer ready. The internal
+ /// readiness flag will be cleared, and tokio will wait for the next
+ /// edge-triggered readiness notification from the OS.
+ ///
+ /// It is critical that this method not be called unless your code
+ /// _actually observes_ that the source is _not_ ready. The OS must
+ /// deliver a subsequent notification, or this source will block
+ /// forever. It is equally critical that you `do` call this method if you
+ /// resubmit the same structure to the kernel and poll it again.
+ ///
+ /// This method is not very useful with AIO readiness, since each `aiocb`
+ /// structure is typically only used once. It's main use with
+ /// [`lio_listio`], which will sometimes send notification when only a
+ /// portion of its elements are complete. In that case, the caller must
+ /// call `clear_ready` before resubmitting it.
+ ///
+ /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
+ pub fn clear_ready(&self, ev: AioEvent) {
+ self.registration.clear_readiness(ev.0)
+ }
+
+ /// Destroy the [`Aio`] and return its inner source.
+ pub fn into_inner(self) -> E {
+ self.io.0
+ }
+
+ /// Polls for readiness. Either AIO or LIO counts.
+ ///
+ /// This method returns:
+ /// * `Poll::Pending` if the underlying operation is not complete, whether
+ /// or not it completed successfully. This will be true if the OS is
+ /// still processing it, or if it has not yet been submitted to the OS.
+ /// * `Poll::Ready(Ok(_))` if the underlying operation is complete.
+ /// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does
+ /// _not_ indicate that the underlying operation encountered an error.
+ ///
+ /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context`
+ /// is scheduled to receive a wakeup when the underlying operation
+ /// completes. Note that on multiple calls to `poll_ready`, only the `Waker` from the
+ /// `Context` passed to the most recent call is scheduled to receive a wakeup.
+ pub fn poll_ready<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> {
+ let ev = ready!(self.registration.poll_read_ready(cx))?;
+ Poll::Ready(Ok(AioEvent(ev)))
+ }
+}
+
+impl<E: AioSource> Deref for Aio<E> {
+ type Target = E;
+
+ fn deref(&self) -> &E {
+ &self.io.0
+ }
+}
+
+impl<E: AioSource> DerefMut for Aio<E> {
+ fn deref_mut(&mut self) -> &mut E {
+ &mut self.io.0
+ }
+}
+
+impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Aio").field("io", &self.io.0).finish()
+ }
+}
+
+/// Opaque data returned by [`Aio::poll_ready`].
+///
+/// It can be fed back to [`Aio::clear_ready`].
+#[derive(Debug)]
+pub struct AioEvent(ReadyEvent);
diff --git a/src/io/driver/interest.rs b/src/io/driver/interest.rs
index 36951cf..c5b18ed 100644
--- a/src/io/driver/interest.rs
+++ b/src/io/driver/interest.rs
@@ -14,6 +14,26 @@ use std::ops;
pub struct Interest(mio::Interest);
impl Interest {
+ // The non-FreeBSD definitions in this block are active only when
+ // building documentation.
+ cfg_aio! {
+ /// Interest for POSIX AIO
+ #[cfg(target_os = "freebsd")]
+ pub const AIO: Interest = Interest(mio::Interest::AIO);
+
+ /// Interest for POSIX AIO
+ #[cfg(not(target_os = "freebsd"))]
+ pub const AIO: Interest = Interest(mio::Interest::READABLE);
+
+ /// Interest for POSIX AIO lio_listio events
+ #[cfg(target_os = "freebsd")]
+ pub const LIO: Interest = Interest(mio::Interest::LIO);
+
+ /// Interest for POSIX AIO lio_listio events
+ #[cfg(not(target_os = "freebsd"))]
+ pub const LIO: Interest = Interest(mio::Interest::READABLE);
+ }
+
/// Interest in all readable events.
///
/// Readable interest includes read-closed events.
diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs
index 3aa0cfb..1511884 100644
--- a/src/io/driver/mod.rs
+++ b/src/io/driver/mod.rs
@@ -51,6 +51,7 @@ pub(crate) struct Handle {
inner: Weak<Inner>,
}
+#[derive(Debug)]
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
diff --git a/src/io/driver/ready.rs b/src/io/driver/ready.rs
index 2ac01bd..305dc91 100644
--- a/src/io/driver/ready.rs
+++ b/src/io/driver/ready.rs
@@ -38,6 +38,17 @@ impl Ready {
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
let mut ready = Ready::EMPTY;
+ #[cfg(all(target_os = "freebsd", feature = "net"))]
+ {
+ if event.is_aio() {
+ ready |= Ready::READABLE;
+ }
+
+ if event.is_lio() {
+ ready |= Ready::READABLE;
+ }
+ }
+
if event.is_readable() {
ready |= Ready::READABLE;
}
diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs
index 5178010..a265720 100644
--- a/src/io/driver/scheduled_io.rs
+++ b/src/io/driver/scheduled_io.rs
@@ -3,6 +3,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
use crate::util::slab::Entry;
+use crate::util::WakeList;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};
@@ -212,10 +213,7 @@ impl ScheduledIo {
}
fn wake0(&self, ready: Ready, shutdown: bool) {
- const NUM_WAKERS: usize = 32;
-
- let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
- let mut curr = 0;
+ let mut wakers = WakeList::new();
let mut waiters = self.waiters.lock();
@@ -224,16 +222,14 @@ impl ScheduledIo {
// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
- wakers[curr] = Some(waker);
- curr += 1;
+ wakers.push(waker);
}
}
// check for AsyncWrite slot
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
- wakers[curr] = Some(waker);
- curr += 1;
+ wakers.push(waker);
}
}
@@ -241,15 +237,14 @@ impl ScheduledIo {
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
- while curr < NUM_WAKERS {
+ while wakers.can_push() {
match iter.next() {
Some(waiter) => {
let waiter = unsafe { &mut *waiter.as_ptr() };
if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
- wakers[curr] = Some(waker);
- curr += 1;
+ wakers.push(waker);
}
}
None => {
@@ -260,11 +255,7 @@ impl ScheduledIo {
drop(waiters);
- for waker in wakers.iter_mut().take(curr) {
- waker.take().unwrap().wake();
- }
-
- curr = 0;
+ wakers.wake_all();
// Acquire the lock again.
waiters = self.waiters.lock();
@@ -273,9 +264,7 @@ impl ScheduledIo {
// Release the lock before notifying
drop(waiters);
- for waker in wakers.iter_mut().take(curr) {
- waker.take().unwrap().wake();
- }
+ wakers.wake_all();
}
pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 14a4a63..a5ee108 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -217,6 +217,15 @@ cfg_io_driver_impl! {
pub(crate) use poll_evented::PollEvented;
}
+cfg_aio! {
+ /// BSD-specific I/O types
+ pub mod bsd {
+ mod poll_aio;
+
+ pub use poll_aio::{Aio, AioEvent, AioSource};
+ }
+}
+
cfg_net_unix! {
mod async_fd;
diff --git a/src/io/util/async_seek_ext.rs b/src/io/util/async_seek_ext.rs
index 297a4a6..46b3e6c 100644
--- a/src/io/util/async_seek_ext.rs
+++ b/src/io/util/async_seek_ext.rs
@@ -67,6 +67,16 @@ cfg_io_util! {
seek(self, pos)
}
+ /// Creates a future which will rewind to the beginning of the stream.
+ ///
+ /// This is convenience method, equivalent to to `self.seek(SeekFrom::Start(0))`.
+ fn rewind(&mut self) -> Seek<'_, Self>
+ where
+ Self: Unpin,
+ {
+ self.seek(SeekFrom::Start(0))
+ }
+
/// Creates a future which will return the current seek position from the
/// start of the stream.
///
diff --git a/src/io/util/fill_buf.rs b/src/io/util/fill_buf.rs
index 98ae2ea..3655c01 100644
--- a/src/io/util/fill_buf.rs
+++ b/src/io/util/fill_buf.rs
@@ -34,15 +34,16 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'a, R> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
- // Due to a limitation in the borrow-checker, we cannot return the value
- // directly on Ready. Once Rust starts using the polonius borrow checker,
- // this can be simplified.
let reader = me.reader.take().expect("Polled after completion.");
match Pin::new(&mut *reader).poll_fill_buf(cx) {
- Poll::Ready(_) => match Pin::new(reader).poll_fill_buf(cx) {
- Poll::Ready(slice) => Poll::Ready(slice),
- Poll::Pending => panic!("poll_fill_buf returned Pending while having data"),
+ Poll::Ready(Ok(slice)) => unsafe {
+ // Safety: This is necessary only due to a limitation in the
+ // borrow checker. Once Rust starts using the polonius borrow
+ // checker, this can be simplified.
+ let slice = std::mem::transmute::<&[u8], &'a [u8]>(slice);
+ Poll::Ready(Ok(slice))
},
+ Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => {
*me.reader = Some(reader);
Poll::Pending
diff --git a/src/lib.rs b/src/lib.rs
index fbf28ff..9689223 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -16,6 +16,7 @@
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
#![cfg_attr(docsrs, feature(doc_cfg))]
+#![cfg_attr(docsrs, allow(unused_attributes))]
//! A runtime for writing reliable network applications without compromising speed.
//!
@@ -204,9 +205,15 @@
//! ```
//!
//! If your code is CPU-bound and you wish to limit the number of threads used
-//! to run it, you should run it on another thread pool such as [rayon]. You
-//! can use an [`oneshot`] channel to send the result back to Tokio when the
-//! rayon task finishes.
+//! to run it, you should use a separate thread pool dedicated to CPU bound tasks.
+//! For example, you could consider using the [rayon] library for CPU-bound
+//! tasks. It is also possible to create an extra Tokio runtime dedicated to
+//! CPU-bound tasks, but if you do this, you should be careful that the extra
+//! runtime runs _only_ CPU-bound tasks, as IO-bound tasks on that runtime
+//! will behave poorly.
+//!
+//! Hint: If using rayon, you can use a [`oneshot`] channel to send the result back
+//! to Tokio when the rayon task finishes.
//!
//! [rayon]: https://docs.rs/rayon
//! [`oneshot`]: crate::sync::oneshot
@@ -307,8 +314,9 @@
//! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler.
//! - `io-util`: Enables the IO based `Ext` traits.
//! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types.
-//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`,
-//! as well as (on Unix-like systems) `AsyncFd`
+//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and
+//! `UdpSocket`, as well as (on Unix-like systems) `AsyncFd` and (on
+//! FreeBSD) `PollAio`.
//! - `time`: Enables `tokio::time` types and allows the schedulers to enable
//! the built in timer.
//! - `process`: Enables `tokio::process` types.
diff --git a/src/loom/std/mod.rs b/src/loom/std/mod.rs
index b29cbee..8b6e8bc 100644
--- a/src/loom/std/mod.rs
+++ b/src/loom/std/mod.rs
@@ -93,4 +93,17 @@ pub(crate) mod sys {
}
}
-pub(crate) use std::thread;
+pub(crate) mod thread {
+ #[inline]
+ pub(crate) fn yield_now() {
+ // TODO: once we bump MSRV to 1.49+, use `hint::spin_loop` instead.
+ #[allow(deprecated)]
+ std::sync::atomic::spin_loop_hint();
+ }
+
+ #[allow(unused_imports)]
+ pub(crate) use std::thread::{
+ current, panicking, park, park_timeout, sleep, spawn, Builder, JoinHandle, LocalKey,
+ Result, Thread, ThreadId,
+ };
+}
diff --git a/src/macros/cfg.rs b/src/macros/cfg.rs
index 7c87522..193bcd7 100644
--- a/src/macros/cfg.rs
+++ b/src/macros/cfg.rs
@@ -45,6 +45,18 @@ macro_rules! cfg_atomic_waker_impl {
}
}
+macro_rules! cfg_aio {
+ ($($item:item)*) => {
+ $(
+ #[cfg(all(any(docsrs, target_os = "freebsd"), feature = "net"))]
+ #[cfg_attr(docsrs,
+ doc(cfg(all(target_os = "freebsd", feature = "net")))
+ )]
+ $item
+ )*
+ }
+}
+
macro_rules! cfg_fs {
($($item:item)*) => {
$(
@@ -162,6 +174,25 @@ macro_rules! cfg_macros {
}
}
+macro_rules! cfg_stats {
+ ($($item:item)*) => {
+ $(
+ #[cfg(all(tokio_unstable, feature = "stats"))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "stats")))]
+ $item
+ )*
+ }
+}
+
+macro_rules! cfg_not_stats {
+ ($($item:item)*) => {
+ $(
+ #[cfg(not(all(tokio_unstable, feature = "stats")))]
+ $item
+ )*
+ }
+}
+
macro_rules! cfg_net {
($($item:item)*) => {
$(
@@ -176,7 +207,7 @@ macro_rules! cfg_net_unix {
($($item:item)*) => {
$(
#[cfg(all(unix, feature = "net"))]
- #[cfg_attr(docsrs, doc(cfg(feature = "net")))]
+ #[cfg_attr(docsrs, doc(cfg(all(unix, feature = "net"))))]
$item
)*
}
diff --git a/src/process/mod.rs b/src/process/mod.rs
index 42654b1..7a15024 100644
--- a/src/process/mod.rs
+++ b/src/process/mod.rs
@@ -225,9 +225,9 @@ pub struct Command {
pub(crate) struct SpawnedChild {
child: imp::Child,
- stdin: Option<imp::ChildStdin>,
- stdout: Option<imp::ChildStdout>,
- stderr: Option<imp::ChildStderr>,
+ stdin: Option<imp::ChildStdio>,
+ stdout: Option<imp::ChildStdio>,
+ stderr: Option<imp::ChildStdio>,
}
impl Command {
@@ -1151,7 +1151,7 @@ impl Child {
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStdin {
- inner: imp::ChildStdin,
+ inner: imp::ChildStdio,
}
/// The standard output stream for spawned children.
@@ -1160,7 +1160,7 @@ pub struct ChildStdin {
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStdout {
- inner: imp::ChildStdout,
+ inner: imp::ChildStdio,
}
/// The standard error stream for spawned children.
@@ -1169,7 +1169,52 @@ pub struct ChildStdout {
/// handle of a child process asynchronously.
#[derive(Debug)]
pub struct ChildStderr {
- inner: imp::ChildStderr,
+ inner: imp::ChildStdio,
+}
+
+impl ChildStdin {
+ /// Create an asynchronous `ChildStdin` from a synchronous one.
+ ///
+ /// # Errors
+ ///
+ /// This method may fail if an error is encountered when setting the pipe to
+ /// non-blocking mode, or when registering the pipe with the runtime's IO
+ /// driver.
+ pub fn from_std(inner: std::process::ChildStdin) -> io::Result<Self> {
+ Ok(Self {
+ inner: imp::stdio(inner)?,
+ })
+ }
+}
+
+impl ChildStdout {
+ /// Create an asynchronous `ChildStderr` from a synchronous one.
+ ///
+ /// # Errors
+ ///
+ /// This method may fail if an error is encountered when setting the pipe to
+ /// non-blocking mode, or when registering the pipe with the runtime's IO
+ /// driver.
+ pub fn from_std(inner: std::process::ChildStdout) -> io::Result<Self> {
+ Ok(Self {
+ inner: imp::stdio(inner)?,
+ })
+ }
+}
+
+impl ChildStderr {
+ /// Create an asynchronous `ChildStderr` from a synchronous one.
+ ///
+ /// # Errors
+ ///
+ /// This method may fail if an error is encountered when setting the pipe to
+ /// non-blocking mode, or when registering the pipe with the runtime's IO
+ /// driver.
+ pub fn from_std(inner: std::process::ChildStderr) -> io::Result<Self> {
+ Ok(Self {
+ inner: imp::stdio(inner)?,
+ })
+ }
}
impl AsyncWrite for ChildStdin {
diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs
index fab63dd..0f379c9 100644
--- a/src/process/unix/mod.rs
+++ b/src/process/unix/mod.rs
@@ -101,9 +101,9 @@ impl fmt::Debug for Child {
pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> {
let mut child = cmd.spawn()?;
- let stdin = stdio(child.stdin.take())?;
- let stdout = stdio(child.stdout.take())?;
- let stderr = stdio(child.stderr.take())?;
+ let stdin = child.stdin.take().map(stdio).transpose()?;
+ let stdout = child.stdout.take().map(stdio).transpose()?;
+ let stderr = child.stderr.take().map(stdio).transpose()?;
let signal = signal(SignalKind::child())?;
@@ -213,9 +213,7 @@ impl Source for Pipe {
}
}
-pub(crate) type ChildStdin = PollEvented<Pipe>;
-pub(crate) type ChildStdout = PollEvented<Pipe>;
-pub(crate) type ChildStderr = PollEvented<Pipe>;
+pub(crate) type ChildStdio = PollEvented<Pipe>;
fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
unsafe {
@@ -240,18 +238,13 @@ fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()>
Ok(())
}
-fn stdio<T>(option: Option<T>) -> io::Result<Option<PollEvented<Pipe>>>
+pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>>
where
T: IntoRawFd,
{
- let io = match option {
- Some(io) => io,
- None => return Ok(None),
- };
-
// Set the fd to nonblocking before we pass it to the event loop
let mut pipe = Pipe::from(io);
set_nonblocking(&mut pipe, true)?;
- Ok(Some(PollEvented::new(pipe)?))
+ PollEvented::new(pipe)
}
diff --git a/src/process/windows.rs b/src/process/windows.rs
index 06fc1b6..136d5b0 100644
--- a/src/process/windows.rs
+++ b/src/process/windows.rs
@@ -67,9 +67,9 @@ unsafe impl Send for Waiting {}
pub(crate) fn spawn_child(cmd: &mut StdCommand) -> io::Result<SpawnedChild> {
let mut child = cmd.spawn()?;
- let stdin = stdio(child.stdin.take());
- let stdout = stdio(child.stdout.take());
- let stderr = stdio(child.stderr.take());
+ let stdin = child.stdin.take().map(stdio).transpose()?;
+ let stdout = child.stdout.take().map(stdio).transpose()?;
+ let stderr = child.stderr.take().map(stdio).transpose()?;
Ok(SpawnedChild {
child: Child {
@@ -167,20 +167,14 @@ unsafe extern "system" fn callback(ptr: PVOID, _timer_fired: BOOLEAN) {
let _ = complete.take().unwrap().send(());
}
-pub(crate) type ChildStdin = PollEvented<NamedPipe>;
-pub(crate) type ChildStdout = PollEvented<NamedPipe>;
-pub(crate) type ChildStderr = PollEvented<NamedPipe>;
+pub(crate) type ChildStdio = PollEvented<NamedPipe>;
-fn stdio<T>(option: Option<T>) -> Option<PollEvented<NamedPipe>>
+pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<NamedPipe>>
where
T: IntoRawHandle,
{
- let io = match option {
- Some(io) => io,
- None => return None,
- };
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
- PollEvented::new(pipe).ok()
+ PollEvented::new(pipe)
}
pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio> {
diff --git a/src/runtime/basic_scheduler.rs b/src/runtime/basic_scheduler.rs
index fe2e4a8..e37d872 100644
--- a/src/runtime/basic_scheduler.rs
+++ b/src/runtime/basic_scheduler.rs
@@ -2,7 +2,9 @@ use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
+use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
+use crate::runtime::Callback;
use crate::sync::notify::Notify;
use crate::util::{waker_ref, Wake, WakerRef};
@@ -47,6 +49,14 @@ struct Inner<P: Park> {
/// Thread park handle
park: P,
+
+ /// Callback for a worker parking itself
+ before_park: Option<Callback>,
+ /// Callback for a worker unparking itself
+ after_unpark: Option<Callback>,
+
+ /// Stats batcher
+ stats: WorkerStatsBatcher,
}
#[derive(Clone)]
@@ -87,6 +97,9 @@ struct Shared {
/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,
+
+ /// Keeps track of various runtime stats.
+ stats: RuntimeStats,
}
/// Thread-local context.
@@ -114,7 +127,11 @@ const REMOTE_FIRST_INTERVAL: u8 = 31;
scoped_thread_local!(static CURRENT: Context);
impl<P: Park> BasicScheduler<P> {
- pub(crate) fn new(park: P) -> BasicScheduler<P> {
+ pub(crate) fn new(
+ park: P,
+ before_park: Option<Callback>,
+ after_unpark: Option<Callback>,
+ ) -> BasicScheduler<P> {
let unpark = Box::new(park.unpark());
let spawner = Spawner {
@@ -123,6 +140,7 @@ impl<P: Park> BasicScheduler<P> {
owned: OwnedTasks::new(),
unpark: unpark as Box<dyn Unpark>,
woken: AtomicBool::new(false),
+ stats: RuntimeStats::new(1),
}),
};
@@ -133,6 +151,9 @@ impl<P: Park> BasicScheduler<P> {
spawner: spawner.clone(),
tick: 0,
park,
+ before_park,
+ after_unpark,
+ stats: WorkerStatsBatcher::new(0),
}));
BasicScheduler {
@@ -205,6 +226,7 @@ impl<P: Park> Inner<P> {
'outer: loop {
if scheduler.spawner.was_woken() || !polled {
polled = true;
+ scheduler.stats.incr_poll_count();
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
}
@@ -237,8 +259,21 @@ impl<P: Park> Inner<P> {
let entry = match entry {
Some(entry) => entry,
None => {
- // Park until the thread is signaled
- scheduler.park.park().expect("failed to park");
+ if let Some(f) = &scheduler.before_park {
+ f();
+ }
+ // This check will fail if `before_park` spawns a task for us to run
+ // instead of parking the thread
+ if context.tasks.borrow_mut().queue.is_empty() {
+ // Park until the thread is signaled
+ scheduler.stats.about_to_park();
+ scheduler.stats.submit(&scheduler.spawner.shared.stats);
+ scheduler.park.park().expect("failed to park");
+ scheduler.stats.returned_from_park();
+ }
+ if let Some(f) = &scheduler.after_unpark {
+ f();
+ }
// Try polling the `block_on` future next
continue 'outer;
@@ -247,6 +282,7 @@ impl<P: Park> Inner<P> {
match entry {
RemoteMsg::Schedule(task) => {
+ scheduler.stats.incr_poll_count();
let task = context.shared.owned.assert_owner(task);
crate::coop::budget(|| task.run())
}
@@ -255,6 +291,7 @@ impl<P: Park> Inner<P> {
// Yield to the park, this drives the timer and pulls any pending
// I/O events.
+ scheduler.stats.submit(&scheduler.spawner.shared.stats);
scheduler
.park
.park_timeout(Duration::from_millis(0))
@@ -369,6 +406,10 @@ impl Spawner {
handle
}
+ pub(crate) fn stats(&self) -> &RuntimeStats {
+ &self.shared.stats
+ }
+
fn pop(&self) -> Option<RemoteMsg> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs
index 51bf8c8..91c365f 100644
--- a/src/runtime/builder.rs
+++ b/src/runtime/builder.rs
@@ -70,6 +70,12 @@ pub struct Builder {
/// To run before each worker thread stops
pub(super) before_stop: Option<Callback>,
+ /// To run before each worker thread is parked.
+ pub(super) before_park: Option<Callback>,
+
+ /// To run after each thread is unparked.
+ pub(super) after_unpark: Option<Callback>,
+
/// Customizable keep alive timeout for BlockingPool
pub(super) keep_alive: Option<Duration>,
}
@@ -135,6 +141,8 @@ impl Builder {
// No worker thread callbacks
after_start: None,
before_stop: None,
+ before_park: None,
+ after_unpark: None,
keep_alive: None,
}
@@ -374,6 +382,120 @@ impl Builder {
self
}
+ /// Executes function `f` just before a thread is parked (goes idle).
+ /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
+ /// can be called, and may result in this thread being unparked immediately.
+ ///
+ /// This can be used to start work only when the executor is idle, or for bookkeeping
+ /// and monitoring purposes.
+ ///
+ /// Note: There can only be one park callback for a runtime; calling this function
+ /// more than once replaces the last callback defined, rather than adding to it.
+ ///
+ /// # Examples
+ ///
+ /// ## Multithreaded executor
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use std::sync::atomic::{AtomicBool, Ordering};
+ /// # use tokio::runtime;
+ /// # use tokio::sync::Barrier;
+ /// # pub fn main() {
+ /// let once = AtomicBool::new(true);
+ /// let barrier = Arc::new(Barrier::new(2));
+ ///
+ /// let runtime = runtime::Builder::new_multi_thread()
+ /// .worker_threads(1)
+ /// .on_thread_park({
+ /// let barrier = barrier.clone();
+ /// move || {
+ /// let barrier = barrier.clone();
+ /// if once.swap(false, Ordering::Relaxed) {
+ /// tokio::spawn(async move { barrier.wait().await; });
+ /// }
+ /// }
+ /// })
+ /// .build()
+ /// .unwrap();
+ ///
+ /// runtime.block_on(async {
+ /// barrier.wait().await;
+ /// })
+ /// # }
+ /// ```
+ /// ## Current thread executor
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use std::sync::atomic::{AtomicBool, Ordering};
+ /// # use tokio::runtime;
+ /// # use tokio::sync::Barrier;
+ /// # pub fn main() {
+ /// let once = AtomicBool::new(true);
+ /// let barrier = Arc::new(Barrier::new(2));
+ ///
+ /// let runtime = runtime::Builder::new_current_thread()
+ /// .on_thread_park({
+ /// let barrier = barrier.clone();
+ /// move || {
+ /// let barrier = barrier.clone();
+ /// if once.swap(false, Ordering::Relaxed) {
+ /// tokio::spawn(async move { barrier.wait().await; });
+ /// }
+ /// }
+ /// })
+ /// .build()
+ /// .unwrap();
+ ///
+ /// runtime.block_on(async {
+ /// barrier.wait().await;
+ /// })
+ /// # }
+ /// ```
+ #[cfg(not(loom))]
+ pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.before_park = Some(std::sync::Arc::new(f));
+ self
+ }
+
+ /// Executes function `f` just after a thread unparks (starts executing tasks).
+ ///
+ /// This is intended for bookkeeping and monitoring use cases; note that work
+ /// in this callback will increase latencies when the application has allowed one or
+ /// more runtime threads to go idle.
+ ///
+ /// Note: There can only be one unpark callback for a runtime; calling this function
+ /// more than once replaces the last callback defined, rather than adding to it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ ///
+ /// # pub fn main() {
+ /// let runtime = runtime::Builder::new_multi_thread()
+ /// .on_thread_unpark(|| {
+ /// println!("thread unparking");
+ /// })
+ /// .build();
+ ///
+ /// runtime.unwrap().block_on(async {
+ /// tokio::task::yield_now().await;
+ /// println!("Hello from Tokio!");
+ /// })
+ /// # }
+ /// ```
+ #[cfg(not(loom))]
+ pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.after_unpark = Some(std::sync::Arc::new(f));
+ self
+ }
+
/// Creates the configured `Runtime`.
///
/// The returned `Runtime` instance is ready to spawn tasks.
@@ -441,7 +563,8 @@ impl Builder {
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
- let scheduler = BasicScheduler::new(driver);
+ let scheduler =
+ BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone());
let spawner = Spawner::Basic(scheduler.spawner().clone());
// Blocking pool
@@ -546,7 +669,7 @@ cfg_rt_multi_thread! {
let (driver, resources) = driver::Driver::new(self.get_cfg())?;
- let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
+ let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone());
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
// Create the blocking pool
@@ -587,7 +710,9 @@ impl fmt::Debug for Builder {
)
.field("thread_stack_size", &self.thread_stack_size)
.field("after_start", &self.after_start.as_ref().map(|_| "..."))
- .field("before_stop", &self.after_start.as_ref().map(|_| "..."))
+ .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
+ .field("before_park", &self.before_park.as_ref().map(|_| "..."))
+ .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
.finish()
}
}
diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs
index ddc170a..bad6a00 100644
--- a/src/runtime/handle.rs
+++ b/src/runtime/handle.rs
@@ -111,6 +111,14 @@ impl Handle {
context::current().ok_or(TryCurrentError(()))
}
+ cfg_stats! {
+ /// Returns a view that lets you get information about how the runtime
+ /// is performing.
+ pub fn stats(&self) -> &crate::runtime::stats::RuntimeStats {
+ self.spawner.stats()
+ }
+ }
+
/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
@@ -192,20 +200,20 @@ impl Handle {
let location = std::panic::Location::caller();
#[cfg(tokio_track_caller)]
let span = tracing::trace_span!(
- target: "tokio::task",
- "task",
+ target: "tokio::task::blocking",
+ "runtime.spawn",
kind = %"blocking",
- function = %std::any::type_name::<F>(),
task.name = %name.unwrap_or_default(),
+ "fn" = %std::any::type_name::<F>(),
spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
);
#[cfg(not(tokio_track_caller))]
let span = tracing::trace_span!(
- target: "tokio::task",
- "task",
+ target: "tokio::task::blocking",
+ "runtime.spawn",
kind = %"blocking",
task.name = %name.unwrap_or_default(),
- function = %std::any::type_name::<F>(),
+ "fn" = %std::any::type_name::<F>(),
);
fut.instrument(span)
};
@@ -288,7 +296,11 @@ impl Handle {
/// [`tokio::fs`]: crate::fs
/// [`tokio::net`]: crate::net
/// [`tokio::time`]: crate::time
+ #[cfg_attr(tokio_track_caller, track_caller)]
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let future = crate::util::trace::task(future, "block_on", None);
+
// Enter the **runtime** context. This configures spawning, the current I/O driver, ...
let _rt_enter = self.enter();
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs
index 52532ec..ec7d0c0 100644
--- a/src/runtime/mod.rs
+++ b/src/runtime/mod.rs
@@ -181,6 +181,13 @@ pub(crate) mod enter;
pub(crate) mod task;
+cfg_stats! {
+ pub mod stats;
+}
+cfg_not_stats! {
+ pub(crate) mod stats;
+}
+
cfg_rt! {
mod basic_scheduler;
use basic_scheduler::BasicScheduler;
@@ -443,7 +450,11 @@ cfg_rt! {
/// ```
///
/// [handle]: fn@Handle::block_on
+ #[cfg_attr(tokio_track_caller, track_caller)]
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let future = crate::util::trace::task(future, "block_on", None);
+
let _enter = self.enter();
match &self.kind {
diff --git a/src/runtime/queue.rs b/src/runtime/queue.rs
index c45cb6a..a88dffc 100644
--- a/src/runtime/queue.rs
+++ b/src/runtime/queue.rs
@@ -3,6 +3,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicU16, AtomicU32};
use crate::loom::sync::Arc;
+use crate::runtime::stats::WorkerStatsBatcher;
use crate::runtime::task::{self, Inject};
use std::mem::MaybeUninit;
@@ -92,6 +93,14 @@ impl<T> Local<T> {
!self.inner.is_empty()
}
+ /// Returns false if there are any entries in the queue
+ ///
+ /// Separate to is_stealable so that refactors of is_stealable to "protect"
+ /// some tasks from stealing won't affect this
+ pub(super) fn has_tasks(&self) -> bool {
+ !self.inner.is_empty()
+ }
+
/// Pushes a task to the back of the local queue, skipping the LIFO slot.
pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) {
let tail = loop {
@@ -288,7 +297,11 @@ impl<T> Steal<T> {
}
/// Steals half the tasks from self and place them into `dst`.
- pub(super) fn steal_into(&self, dst: &mut Local<T>) -> Option<task::Notified<T>> {
+ pub(super) fn steal_into(
+ &self,
+ dst: &mut Local<T>,
+ stats: &mut WorkerStatsBatcher,
+ ) -> Option<task::Notified<T>> {
// Safety: the caller is the only thread that mutates `dst.tail` and
// holds a mutable reference.
let dst_tail = unsafe { dst.inner.tail.unsync_load() };
@@ -307,6 +320,7 @@ impl<T> Steal<T> {
// Steal the tasks into `dst`'s buffer. This does not yet expose the
// tasks in `dst`.
let mut n = self.steal_into2(dst, dst_tail);
+ stats.incr_steal_count(n);
if n == 0 {
// No tasks were stolen
diff --git a/src/runtime/spawner.rs b/src/runtime/spawner.rs
index fbcde2c..9a3d465 100644
--- a/src/runtime/spawner.rs
+++ b/src/runtime/spawner.rs
@@ -1,8 +1,7 @@
-cfg_rt! {
- use crate::future::Future;
- use crate::runtime::basic_scheduler;
- use crate::task::JoinHandle;
-}
+use crate::future::Future;
+use crate::runtime::basic_scheduler;
+use crate::runtime::stats::RuntimeStats;
+use crate::task::JoinHandle;
cfg_rt_multi_thread! {
use crate::runtime::thread_pool;
@@ -10,7 +9,6 @@ cfg_rt_multi_thread! {
#[derive(Debug, Clone)]
pub(crate) enum Spawner {
- #[cfg(feature = "rt")]
Basic(basic_scheduler::Spawner),
#[cfg(feature = "rt-multi-thread")]
ThreadPool(thread_pool::Spawner),
@@ -25,21 +23,25 @@ impl Spawner {
}
}
}
-}
-cfg_rt! {
- impl Spawner {
- pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- match self {
- #[cfg(feature = "rt")]
- Spawner::Basic(spawner) => spawner.spawn(future),
- #[cfg(feature = "rt-multi-thread")]
- Spawner::ThreadPool(spawner) => spawner.spawn(future),
- }
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ match self {
+ Spawner::Basic(spawner) => spawner.spawn(future),
+ #[cfg(feature = "rt-multi-thread")]
+ Spawner::ThreadPool(spawner) => spawner.spawn(future),
+ }
+ }
+
+ #[cfg_attr(not(all(tokio_unstable, feature = "stats")), allow(dead_code))]
+ pub(crate) fn stats(&self) -> &RuntimeStats {
+ match self {
+ Spawner::Basic(spawner) => spawner.stats(),
+ #[cfg(feature = "rt-multi-thread")]
+ Spawner::ThreadPool(spawner) => spawner.stats(),
}
}
}
diff --git a/src/runtime/stats/mock.rs b/src/runtime/stats/mock.rs
new file mode 100644
index 0000000..3bda8bf
--- /dev/null
+++ b/src/runtime/stats/mock.rs
@@ -0,0 +1,27 @@
+//! This file contains mocks of the types in src/runtime/stats/stats.rs
+
+pub(crate) struct RuntimeStats {}
+
+impl RuntimeStats {
+ pub(crate) fn new(_worker_threads: usize) -> Self {
+ Self {}
+ }
+}
+
+pub(crate) struct WorkerStatsBatcher {}
+
+impl WorkerStatsBatcher {
+ pub(crate) fn new(_my_index: usize) -> Self {
+ Self {}
+ }
+
+ pub(crate) fn submit(&mut self, _to: &RuntimeStats) {}
+
+ pub(crate) fn about_to_park(&mut self) {}
+ pub(crate) fn returned_from_park(&mut self) {}
+
+ #[cfg(feature = "rt-multi-thread")]
+ pub(crate) fn incr_steal_count(&mut self, _by: u16) {}
+
+ pub(crate) fn incr_poll_count(&mut self) {}
+}
diff --git a/src/runtime/stats/mod.rs b/src/runtime/stats/mod.rs
new file mode 100644
index 0000000..5e08e8e
--- /dev/null
+++ b/src/runtime/stats/mod.rs
@@ -0,0 +1,17 @@
+//! This module contains information need to view information about how the
+//! runtime is performing.
+#![allow(clippy::module_inception)]
+
+cfg_stats! {
+ mod stats;
+
+ pub use self::stats::{RuntimeStats, WorkerStats};
+ pub(crate) use self::stats::WorkerStatsBatcher;
+}
+
+cfg_not_stats! {
+ #[path = "mock.rs"]
+ mod stats;
+
+ pub(crate) use self::stats::{RuntimeStats, WorkerStatsBatcher};
+}
diff --git a/src/runtime/stats/stats.rs b/src/runtime/stats/stats.rs
new file mode 100644
index 0000000..39a48ae
--- /dev/null
+++ b/src/runtime/stats/stats.rs
@@ -0,0 +1,97 @@
+//! This file contains the types necessary to collect various types of stats.
+use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed};
+
+/// This type contains methods to retrieve stats from a Tokio runtime.
+#[derive(Debug)]
+pub struct RuntimeStats {
+ workers: Box<[WorkerStats]>,
+}
+
+/// This type contains methods to retrieve stats from a worker thread on a Tokio runtime.
+#[derive(Debug)]
+#[repr(align(128))]
+pub struct WorkerStats {
+ park_count: AtomicU64,
+ steal_count: AtomicU64,
+ poll_count: AtomicU64,
+}
+
+impl RuntimeStats {
+ pub(crate) fn new(worker_threads: usize) -> Self {
+ let mut workers = Vec::with_capacity(worker_threads);
+ for _ in 0..worker_threads {
+ workers.push(WorkerStats {
+ park_count: AtomicU64::new(0),
+ steal_count: AtomicU64::new(0),
+ poll_count: AtomicU64::new(0),
+ });
+ }
+
+ Self {
+ workers: workers.into_boxed_slice(),
+ }
+ }
+
+ /// Returns a slice containing the worker stats for each worker thread.
+ pub fn workers(&self) -> impl Iterator<Item = &WorkerStats> {
+ self.workers.iter()
+ }
+}
+
+impl WorkerStats {
+ /// Returns the total number of times this worker thread has parked.
+ pub fn park_count(&self) -> u64 {
+ self.park_count.load(Relaxed)
+ }
+
+ /// Returns the number of tasks this worker has stolen from other worker
+ /// threads.
+ pub fn steal_count(&self) -> u64 {
+ self.steal_count.load(Relaxed)
+ }
+
+ /// Returns the number of times this worker has polled a task.
+ pub fn poll_count(&self) -> u64 {
+ self.poll_count.load(Relaxed)
+ }
+}
+
+pub(crate) struct WorkerStatsBatcher {
+ my_index: usize,
+ park_count: u64,
+ steal_count: u64,
+ poll_count: u64,
+}
+
+impl WorkerStatsBatcher {
+ pub(crate) fn new(my_index: usize) -> Self {
+ Self {
+ my_index,
+ park_count: 0,
+ steal_count: 0,
+ poll_count: 0,
+ }
+ }
+ pub(crate) fn submit(&mut self, to: &RuntimeStats) {
+ let worker = &to.workers[self.my_index];
+
+ worker.park_count.store(self.park_count, Relaxed);
+ worker.steal_count.store(self.steal_count, Relaxed);
+ worker.poll_count.store(self.poll_count, Relaxed);
+ }
+
+ pub(crate) fn about_to_park(&mut self) {
+ self.park_count += 1;
+ }
+
+ pub(crate) fn returned_from_park(&mut self) {}
+
+ #[cfg(feature = "rt-multi-thread")]
+ pub(crate) fn incr_steal_count(&mut self, by: u16) {
+ self.steal_count += u64::from(by);
+ }
+
+ pub(crate) fn incr_poll_count(&mut self) {
+ self.poll_count += 1;
+ }
+}
diff --git a/src/runtime/task/join.rs b/src/runtime/task/join.rs
index 2fe40a7..0abbff2 100644
--- a/src/runtime/task/join.rs
+++ b/src/runtime/task/join.rs
@@ -162,7 +162,7 @@ impl<T> JoinHandle<T> {
///
/// Awaiting a cancelled task might complete as usual if the task was
/// already completed at the time it was cancelled, but most likely it
- /// will complete with a `Err(JoinError::Cancelled)`.
+ /// will fail with a [cancelled] `JoinError`.
///
/// ```rust
/// use tokio::time;
@@ -190,6 +190,7 @@ impl<T> JoinHandle<T> {
/// }
/// }
/// ```
+ /// [cancelled]: method@super::error::JoinError::is_cancelled
pub fn abort(&self) {
if let Some(raw) = self.raw {
raw.remote_abort();
diff --git a/src/runtime/tests/loom_queue.rs b/src/runtime/tests/loom_queue.rs
index a1ed171..2cbb0a1 100644
--- a/src/runtime/tests/loom_queue.rs
+++ b/src/runtime/tests/loom_queue.rs
@@ -1,5 +1,6 @@
use crate::runtime::blocking::NoopSchedule;
use crate::runtime::queue;
+use crate::runtime::stats::WorkerStatsBatcher;
use crate::runtime::task::Inject;
use loom::thread;
@@ -11,11 +12,12 @@ fn basic() {
let inject = Inject::new();
let th = thread::spawn(move || {
+ let mut stats = WorkerStatsBatcher::new(0);
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..3 {
- if steal.steal_into(&mut local).is_some() {
+ if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
@@ -65,10 +67,11 @@ fn steal_overflow() {
let inject = Inject::new();
let th = thread::spawn(move || {
+ let mut stats = WorkerStatsBatcher::new(0);
let (_, mut local) = queue::local();
let mut n = 0;
- if steal.steal_into(&mut local).is_some() {
+ if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
@@ -113,9 +116,10 @@ fn multi_stealer() {
const NUM_TASKS: usize = 5;
fn steal_tasks(steal: queue::Steal<NoopSchedule>) -> usize {
+ let mut stats = WorkerStatsBatcher::new(0);
let (_, mut local) = queue::local();
- if steal.steal_into(&mut local).is_none() {
+ if steal.steal_into(&mut local, &mut stats).is_none() {
return 0;
}
@@ -165,6 +169,7 @@ fn multi_stealer() {
#[test]
fn chained_steal() {
loom::model(|| {
+ let mut stats = WorkerStatsBatcher::new(0);
let (s1, mut l1) = queue::local();
let (s2, mut l2) = queue::local();
let inject = Inject::new();
@@ -180,8 +185,9 @@ fn chained_steal() {
// Spawn a task to steal from **our** queue
let th = thread::spawn(move || {
+ let mut stats = WorkerStatsBatcher::new(0);
let (_, mut local) = queue::local();
- s1.steal_into(&mut local);
+ s1.steal_into(&mut local, &mut stats);
while local.pop().is_some() {}
});
@@ -189,7 +195,7 @@ fn chained_steal() {
// Drain our tasks, then attempt to steal
while l1.pop().is_some() {}
- s2.steal_into(&mut l1);
+ s2.steal_into(&mut l1, &mut stats);
th.join().unwrap();
diff --git a/src/runtime/tests/queue.rs b/src/runtime/tests/queue.rs
index 428b002..47f1b01 100644
--- a/src/runtime/tests/queue.rs
+++ b/src/runtime/tests/queue.rs
@@ -1,4 +1,5 @@
use crate::runtime::queue;
+use crate::runtime::stats::WorkerStatsBatcher;
use crate::runtime::task::{self, Inject, Schedule, Task};
use std::thread;
@@ -44,6 +45,8 @@ fn overflow() {
#[test]
fn steal_batch() {
+ let mut stats = WorkerStatsBatcher::new(0);
+
let (steal1, mut local1) = queue::local();
let (_, mut local2) = queue::local();
let inject = Inject::new();
@@ -53,7 +56,7 @@ fn steal_batch() {
local1.push_back(task, &inject);
}
- assert!(steal1.steal_into(&mut local2).is_some());
+ assert!(steal1.steal_into(&mut local2, &mut stats).is_some());
for _ in 0..1 {
assert!(local2.pop().is_some());
@@ -81,11 +84,12 @@ fn stress1() {
let inject = Inject::new();
let th = thread::spawn(move || {
+ let mut stats = WorkerStatsBatcher::new(0);
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..NUM_STEAL {
- if steal.steal_into(&mut local).is_some() {
+ if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
@@ -137,11 +141,12 @@ fn stress2() {
let inject = Inject::new();
let th = thread::spawn(move || {
+ let mut stats = WorkerStatsBatcher::new(0);
let (_, mut local) = queue::local();
let mut n = 0;
for _ in 0..NUM_STEAL {
- if steal.steal_into(&mut local).is_some() {
+ if steal.steal_into(&mut local, &mut stats).is_some() {
n += 1;
}
diff --git a/src/runtime/thread_pool/mod.rs b/src/runtime/thread_pool/mod.rs
index 3808aa2..f2e68f6 100644
--- a/src/runtime/thread_pool/mod.rs
+++ b/src/runtime/thread_pool/mod.rs
@@ -12,8 +12,9 @@ pub(crate) use worker::Launch;
pub(crate) use worker::block_in_place;
use crate::loom::sync::Arc;
+use crate::runtime::stats::RuntimeStats;
use crate::runtime::task::JoinHandle;
-use crate::runtime::Parker;
+use crate::runtime::{Callback, Parker};
use std::fmt;
use std::future::Future;
@@ -43,8 +44,13 @@ pub(crate) struct Spawner {
// ===== impl ThreadPool =====
impl ThreadPool {
- pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) {
- let (shared, launch) = worker::create(size, parker);
+ pub(crate) fn new(
+ size: usize,
+ parker: Parker,
+ before_park: Option<Callback>,
+ after_unpark: Option<Callback>,
+ ) -> (ThreadPool, Launch) {
+ let (shared, launch) = worker::create(size, parker, before_park, after_unpark);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };
@@ -99,6 +105,10 @@ impl Spawner {
pub(crate) fn shutdown(&mut self) {
self.shared.close();
}
+
+ pub(crate) fn stats(&self) -> &RuntimeStats {
+ self.shared.stats()
+ }
}
impl fmt::Debug for Spawner {
diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs
index f5004c0..44f8db8 100644
--- a/src/runtime/thread_pool/worker.rs
+++ b/src/runtime/thread_pool/worker.rs
@@ -64,9 +64,10 @@ use crate::park::{Park, Unpark};
use crate::runtime;
use crate::runtime::enter::EnterContext;
use crate::runtime::park::{Parker, Unparker};
+use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::thread_pool::{AtomicCell, Idle};
-use crate::runtime::{queue, task};
+use crate::runtime::{queue, task, Callback};
use crate::util::FastRand;
use std::cell::RefCell;
@@ -112,6 +113,9 @@ struct Core {
/// borrow checker happy.
park: Option<Parker>,
+ /// Batching stats so they can be submitted to RuntimeStats.
+ stats: WorkerStatsBatcher,
+
/// Fast random number generator.
rand: FastRand,
}
@@ -137,6 +141,14 @@ pub(super) struct Shared {
/// stolen by a thread that was spawned as part of `block_in_place`.
#[allow(clippy::vec_box)] // we're moving an already-boxed value
shutdown_cores: Mutex<Vec<Box<Core>>>,
+
+ /// Callback for a worker parking itself
+ before_park: Option<Callback>,
+ /// Callback for a worker unparking itself
+ after_unpark: Option<Callback>,
+
+ /// Collect stats from the runtime.
+ stats: RuntimeStats,
}
/// Used to communicate with a worker from other threads.
@@ -174,12 +186,17 @@ type Notified = task::Notified<Arc<Shared>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);
-pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
+pub(super) fn create(
+ size: usize,
+ park: Parker,
+ before_park: Option<Callback>,
+ after_unpark: Option<Callback>,
+) -> (Arc<Shared>, Launch) {
let mut cores = vec![];
let mut remotes = vec![];
// Create the local queues
- for _ in 0..size {
+ for i in 0..size {
let (steal, run_queue) = queue::local();
let park = park.clone();
@@ -192,6 +209,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
is_searching: false,
is_shutdown: false,
park: Some(park),
+ stats: WorkerStatsBatcher::new(i),
rand: FastRand::new(seed()),
}));
@@ -204,6 +222,9 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
+ before_park,
+ after_unpark,
+ stats: RuntimeStats::new(size),
});
let mut launch = Launch(vec![]);
@@ -391,6 +412,7 @@ impl Context {
core.transition_from_searching(&self.worker);
// Make the core available to the runtime context
+ core.stats.incr_poll_count();
*self.core.borrow_mut() = Some(core);
// Run the task
@@ -415,6 +437,7 @@ impl Context {
if coop::has_budget_remaining() {
// Run the LIFO task, then loop
+ core.stats.incr_poll_count();
*self.core.borrow_mut() = Some(core);
let task = self.worker.shared.owned.assert_owner(task);
task.run();
@@ -442,19 +465,26 @@ impl Context {
}
fn park(&self, mut core: Box<Core>) -> Box<Core> {
- core.transition_to_parked(&self.worker);
+ if let Some(f) = &self.worker.shared.before_park {
+ f();
+ }
- while !core.is_shutdown {
- core = self.park_timeout(core, None);
+ if core.transition_to_parked(&self.worker) {
+ while !core.is_shutdown {
+ core = self.park_timeout(core, None);
- // Run regularly scheduled maintenance
- core.maintenance(&self.worker);
+ // Run regularly scheduled maintenance
+ core.maintenance(&self.worker);
- if core.transition_from_parked(&self.worker) {
- return core;
+ if core.transition_from_parked(&self.worker) {
+ break;
+ }
}
}
+ if let Some(f) = &self.worker.shared.after_unpark {
+ f();
+ }
core
}
@@ -462,6 +492,8 @@ impl Context {
// Take the parker out of core
let mut park = core.park.take().expect("park missing");
+ core.stats.about_to_park();
+
// Store `core` in context
*self.core.borrow_mut() = Some(core);
@@ -483,6 +515,8 @@ impl Context {
self.worker.shared.notify_parked();
}
+ core.stats.returned_from_park();
+
core
}
}
@@ -524,7 +558,10 @@ impl Core {
}
let target = &worker.shared.remotes[i];
- if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
+ if let Some(task) = target
+ .steal
+ .steal_into(&mut self.run_queue, &mut self.stats)
+ {
return Some(task);
}
}
@@ -551,7 +588,14 @@ impl Core {
}
/// Prepare the worker state for parking
- fn transition_to_parked(&mut self, worker: &Worker) {
+ ///
+ /// Returns true if the transition happend, false if there is work to do first
+ fn transition_to_parked(&mut self, worker: &Worker) -> bool {
+ // Workers should not park if they have work to do
+ if self.lifo_slot.is_some() || self.run_queue.has_tasks() {
+ return false;
+ }
+
// When the final worker transitions **out** of searching to parked, it
// must check all the queues one last time in case work materialized
// between the last work scan and transitioning out of searching.
@@ -567,6 +611,8 @@ impl Core {
if is_last_searcher {
worker.shared.notify_if_work_pending();
}
+
+ true
}
/// Returns `true` if the transition happened.
@@ -590,6 +636,8 @@ impl Core {
/// Runs maintenance work such as checking the pool's state.
fn maintenance(&mut self, worker: &Worker) {
+ self.stats.submit(&worker.shared.stats);
+
if !self.is_shutdown {
// Check if the scheduler has been shutdown
self.is_shutdown = worker.inject().is_closed();
@@ -601,6 +649,8 @@ impl Core {
fn pre_shutdown(&mut self, worker: &Worker) {
// Signal to all tasks to shut down.
worker.shared.owned.close_and_shutdown_all();
+
+ self.stats.submit(&worker.shared.stats);
}
/// Shutdown the core
@@ -651,6 +701,10 @@ impl Shared {
handle
}
+ pub(crate) fn stats(&self) -> &RuntimeStats {
+ &self.stats
+ }
+
pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
CURRENT.with(|maybe_cx| {
if let Some(cx) = maybe_cx {
diff --git a/src/sync/batch_semaphore.rs b/src/sync/batch_semaphore.rs
index 872d53e..9b43404 100644
--- a/src/sync/batch_semaphore.rs
+++ b/src/sync/batch_semaphore.rs
@@ -19,6 +19,7 @@ use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Mutex, MutexGuard};
use crate::util::linked_list::{self, LinkedList};
+use crate::util::WakeList;
use std::future::Future;
use std::marker::PhantomPinned;
@@ -239,12 +240,12 @@ impl Semaphore {
/// If `rem` exceeds the number of permits needed by the wait list, the
/// remainder are assigned back to the semaphore.
fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
- let mut wakers: [Option<Waker>; 8] = Default::default();
+ let mut wakers = WakeList::new();
let mut lock = Some(waiters);
let mut is_empty = false;
while rem > 0 {
let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
- 'inner: for slot in &mut wakers[..] {
+ 'inner: while wakers.can_push() {
// Was the waiter assigned enough permits to wake it?
match waiters.queue.last() {
Some(waiter) => {
@@ -260,7 +261,11 @@ impl Semaphore {
}
};
let mut waiter = waiters.queue.pop_back().unwrap();
- *slot = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
+ if let Some(waker) =
+ unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
+ {
+ wakers.push(waker);
+ }
}
if rem > 0 && is_empty {
@@ -283,10 +288,7 @@ impl Semaphore {
drop(waiters); // release the lock
- wakers
- .iter_mut()
- .filter_map(Option::take)
- .for_each(Waker::wake);
+ wakers.wake_all();
}
assert_eq!(rem, 0);
diff --git a/src/sync/mpsc/block.rs b/src/sync/mpsc/block.rs
index 7a0873b..6e7b700 100644
--- a/src/sync/mpsc/block.rs
+++ b/src/sync/mpsc/block.rs
@@ -343,13 +343,7 @@ impl<T> Block<T> {
Err(curr) => curr,
};
- #[cfg(all(test, loom))]
crate::loom::thread::yield_now();
-
- // TODO: once we bump MSRV to 1.49+, use `hint::spin_loop` instead.
- #[cfg(not(all(test, loom)))]
- #[allow(deprecated)]
- std::sync::atomic::spin_loop_hint();
}
}
}
diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs
index d7af172..bcad84d 100644
--- a/src/sync/mpsc/bounded.rs
+++ b/src/sync/mpsc/bounded.rs
@@ -1,6 +1,6 @@
use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
use crate::sync::mpsc::chan;
-use crate::sync::mpsc::error::{SendError, TrySendError};
+use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
cfg_time! {
use crate::sync::mpsc::error::SendTimeoutError;
@@ -14,8 +14,8 @@ use std::task::{Context, Poll};
///
/// Instances are created by the [`channel`](channel) function.
///
-/// To use the `Sender` in a poll function, you can use the [`PollSender`]
-/// utility.
+/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
+/// use the [`PollSender`] utility.
///
/// [`PollSender`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSender.html
pub struct Sender<T> {
@@ -187,6 +187,50 @@ impl<T> Receiver<T> {
poll_fn(|cx| self.chan.recv(cx)).await
}
+ /// Try to receive the next value for this receiver.
+ ///
+ /// This method returns the [`Empty`] error if the channel is currently
+ /// empty, but there are still outstanding [senders] or [permits].
+ ///
+ /// This method returns the [`Disconnected`] error if the channel is
+ /// currently empty, and there are no outstanding [senders] or [permits].
+ ///
+ /// Unlike the [`poll_recv`] method, this method will never return an
+ /// [`Empty`] error spuriously.
+ ///
+ /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
+ /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
+ /// [`poll_recv`]: Self::poll_recv
+ /// [senders]: crate::sync::mpsc::Sender
+ /// [permits]: crate::sync::mpsc::Permit
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ /// use tokio::sync::mpsc::error::TryRecvError;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(100);
+ ///
+ /// tx.send("hello").await.unwrap();
+ ///
+ /// assert_eq!(Ok("hello"), rx.try_recv());
+ /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+ ///
+ /// tx.send("hello").await.unwrap();
+ /// // Drop the last sender, closing the channel.
+ /// drop(tx);
+ ///
+ /// assert_eq!(Ok("hello"), rx.try_recv());
+ /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
+ /// }
+ /// ```
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ self.chan.try_recv()
+ }
+
/// Blocking receive to call outside of asynchronous contexts.
///
/// This method returns `None` if the channel has been closed and there are
@@ -291,7 +335,7 @@ impl<T> Receiver<T> {
/// This method returns:
///
/// * `Poll::Pending` if no messages are available but the channel is not
- /// closed.
+ /// closed, or if a spurious failure happens.
/// * `Poll::Ready(Some(message))` if a message is available.
/// * `Poll::Ready(None)` if the channel has been closed and all messages
/// sent before it was closed have been received.
@@ -301,6 +345,12 @@ impl<T> Receiver<T> {
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
+ ///
+ /// If this method returns `Poll::Pending` due to a spurious failure, then
+ /// the `Waker` will be notified when the situation causing the spurious
+ /// failure has been resolved. Note that receiving such a wakeup does not
+ /// guarantee that the next call will succeed — it could fail with another
+ /// spurious failure.
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@@ -811,7 +861,8 @@ impl<T> Sender<T> {
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
match self.chan.semaphore().0.try_acquire(1) {
Ok(_) => {}
- Err(_) => return Err(TrySendError::Full(())),
+ Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
+ Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
}
Ok(Permit { chan: &self.chan })
@@ -875,7 +926,8 @@ impl<T> Sender<T> {
pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
match self.chan.semaphore().0.try_acquire(1) {
Ok(_) => {}
- Err(_) => return Err(TrySendError::Full(self)),
+ Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
+ Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
}
Ok(OwnedPermit {
diff --git a/src/sync/mpsc/chan.rs b/src/sync/mpsc/chan.rs
index 554d022..637ae1f 100644
--- a/src/sync/mpsc/chan.rs
+++ b/src/sync/mpsc/chan.rs
@@ -2,6 +2,9 @@ use crate::loom::cell::UnsafeCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
+use crate::park::thread::CachedParkThread;
+use crate::park::Park;
+use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::list;
use crate::sync::notify::Notify;
@@ -263,6 +266,51 @@ impl<T, S: Semaphore> Rx<T, S> {
}
})
}
+
+ /// Try to receive the next value.
+ pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ use super::list::TryPopResult;
+
+ self.inner.rx_fields.with_mut(|rx_fields_ptr| {
+ let rx_fields = unsafe { &mut *rx_fields_ptr };
+
+ macro_rules! try_recv {
+ () => {
+ match rx_fields.list.try_pop(&self.inner.tx) {
+ TryPopResult::Ok(value) => {
+ self.inner.semaphore.add_permit();
+ return Ok(value);
+ }
+ TryPopResult::Closed => return Err(TryRecvError::Disconnected),
+ TryPopResult::Empty => return Err(TryRecvError::Empty),
+ TryPopResult::Busy => {} // fall through
+ }
+ };
+ }
+
+ try_recv!();
+
+ // If a previous `poll_recv` call has set a waker, we wake it here.
+ // This allows us to put our own CachedParkThread waker in the
+ // AtomicWaker slot instead.
+ //
+ // This is not a spurious wakeup to `poll_recv` since we just got a
+ // Busy from `try_pop`, which only happens if there are messages in
+ // the queue.
+ self.inner.rx_waker.wake();
+
+ // Park the thread until the problematic send has completed.
+ let mut park = CachedParkThread::new();
+ let waker = park.unpark().into_waker();
+ loop {
+ self.inner.rx_waker.register_by_ref(&waker);
+ // It is possible that the problematic send has now completed,
+ // so we have to check for messages again.
+ try_recv!();
+ park.park().expect("park failed");
+ }
+ })
+ }
}
impl<T, S: Semaphore> Drop for Rx<T, S> {
diff --git a/src/sync/mpsc/error.rs b/src/sync/mpsc/error.rs
index 0d25ad3..48ca379 100644
--- a/src/sync/mpsc/error.rs
+++ b/src/sync/mpsc/error.rs
@@ -51,6 +51,30 @@ impl<T> From<SendError<T>> for TrySendError<T> {
}
}
+// ===== TryRecvError =====
+
+/// Error returned by `try_recv`.
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+pub enum TryRecvError {
+ /// This **channel** is currently empty, but the **Sender**(s) have not yet
+ /// disconnected, so data may yet become available.
+ Empty,
+ /// The **channel**'s sending half has become disconnected, and there will
+ /// never be any more data received on it.
+ Disconnected,
+}
+
+impl fmt::Display for TryRecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match *self {
+ TryRecvError::Empty => "receiving on an empty channel".fmt(fmt),
+ TryRecvError::Disconnected => "receiving on a closed channel".fmt(fmt),
+ }
+ }
+}
+
+impl Error for TryRecvError {}
+
// ===== RecvError =====
/// Error returned by `Receiver`.
diff --git a/src/sync/mpsc/list.rs b/src/sync/mpsc/list.rs
index 5dad2ba..53c34d2 100644
--- a/src/sync/mpsc/list.rs
+++ b/src/sync/mpsc/list.rs
@@ -13,23 +13,35 @@ pub(crate) struct Tx<T> {
/// Tail in the `Block` mpmc list.
block_tail: AtomicPtr<Block<T>>,
- /// Position to push the next message. This reference a block and offset
+ /// Position to push the next message. This references a block and offset
/// into the block.
tail_position: AtomicUsize,
}
/// List queue receive handle
pub(crate) struct Rx<T> {
- /// Pointer to the block being processed
+ /// Pointer to the block being processed.
head: NonNull<Block<T>>,
- /// Next slot index to process
+ /// Next slot index to process.
index: usize,
- /// Pointer to the next block pending release
+ /// Pointer to the next block pending release.
free_head: NonNull<Block<T>>,
}
+/// Return value of `Rx::try_pop`.
+pub(crate) enum TryPopResult<T> {
+ /// Successfully popped a value.
+ Ok(T),
+ /// The channel is empty.
+ Empty,
+ /// The channel is empty and closed.
+ Closed,
+ /// The channel is not empty, but the first value is being written.
+ Busy,
+}
+
pub(crate) fn channel<T>() -> (Tx<T>, Rx<T>) {
// Create the initial block shared between the tx and rx halves.
let initial_block = Box::new(Block::new(0));
@@ -218,7 +230,7 @@ impl<T> fmt::Debug for Tx<T> {
}
impl<T> Rx<T> {
- /// Pops the next value off the queue
+ /// Pops the next value off the queue.
pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
// Advance `head`, if needed
if !self.try_advancing_head() {
@@ -240,6 +252,26 @@ impl<T> Rx<T> {
}
}
+ /// Pops the next value off the queue, detecting whether the block
+ /// is busy or empty on failure.
+ ///
+ /// This function exists because `Rx::pop` can return `None` even if the
+ /// channel's queue contains a message that has been completely written.
+ /// This can happen if the fully delivered message is behind another message
+ /// that is in the middle of being written to the block, since the channel
+ /// can't return the messages out of order.
+ pub(crate) fn try_pop(&mut self, tx: &Tx<T>) -> TryPopResult<T> {
+ let tail_position = tx.tail_position.load(Acquire);
+ let result = self.pop(tx);
+
+ match result {
+ Some(block::Read::Value(t)) => TryPopResult::Ok(t),
+ Some(block::Read::Closed) => TryPopResult::Closed,
+ None if tail_position == self.index => TryPopResult::Empty,
+ None => TryPopResult::Busy,
+ }
+ }
+
/// Tries advancing the block pointer to the block referenced by `self.index`.
///
/// Returns `true` if successful, `false` if there is no next block to load.
diff --git a/src/sync/mpsc/unbounded.rs b/src/sync/mpsc/unbounded.rs
index 23c80f6..8961930 100644
--- a/src/sync/mpsc/unbounded.rs
+++ b/src/sync/mpsc/unbounded.rs
@@ -1,6 +1,6 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::sync::mpsc::chan;
-use crate::sync::mpsc::error::SendError;
+use crate::sync::mpsc::error::{SendError, TryRecvError};
use std::fmt;
use std::task::{Context, Poll};
@@ -129,6 +129,50 @@ impl<T> UnboundedReceiver<T> {
poll_fn(|cx| self.poll_recv(cx)).await
}
+ /// Try to receive the next value for this receiver.
+ ///
+ /// This method returns the [`Empty`] error if the channel is currently
+ /// empty, but there are still outstanding [senders] or [permits].
+ ///
+ /// This method returns the [`Disconnected`] error if the channel is
+ /// currently empty, and there are no outstanding [senders] or [permits].
+ ///
+ /// Unlike the [`poll_recv`] method, this method will never return an
+ /// [`Empty`] error spuriously.
+ ///
+ /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
+ /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
+ /// [`poll_recv`]: Self::poll_recv
+ /// [senders]: crate::sync::mpsc::Sender
+ /// [permits]: crate::sync::mpsc::Permit
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ /// use tokio::sync::mpsc::error::TryRecvError;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::unbounded_channel();
+ ///
+ /// tx.send("hello").unwrap();
+ ///
+ /// assert_eq!(Ok("hello"), rx.try_recv());
+ /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+ ///
+ /// tx.send("hello").unwrap();
+ /// // Drop the last sender, closing the channel.
+ /// drop(tx);
+ ///
+ /// assert_eq!(Ok("hello"), rx.try_recv());
+ /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
+ /// }
+ /// ```
+ pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
+ self.chan.try_recv()
+ }
+
/// Blocking receive to call outside of asynchronous contexts.
///
/// # Panics
@@ -172,7 +216,7 @@ impl<T> UnboundedReceiver<T> {
/// This method returns:
///
/// * `Poll::Pending` if no messages are available but the channel is not
- /// closed.
+ /// closed, or if a spurious failure happens.
/// * `Poll::Ready(Some(message))` if a message is available.
/// * `Poll::Ready(None)` if the channel has been closed and all messages
/// sent before it was closed have been received.
@@ -182,6 +226,12 @@ impl<T> UnboundedReceiver<T> {
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
+ ///
+ /// If this method returns `Poll::Pending` due to a spurious failure, then
+ /// the `Waker` will be notified when the situation causing the spurious
+ /// failure has been resolved. Note that receiving such a wakeup does not
+ /// guarantee that the next call will succeed — it could fail with another
+ /// spurious failure.
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
diff --git a/src/sync/notify.rs b/src/sync/notify.rs
index 2ea6359..74b97cc 100644
--- a/src/sync/notify.rs
+++ b/src/sync/notify.rs
@@ -8,6 +8,7 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::linked_list::{self, LinkedList};
+use crate::util::WakeList;
use std::cell::UnsafeCell;
use std::future::Future;
@@ -391,10 +392,7 @@ impl Notify {
/// }
/// ```
pub fn notify_waiters(&self) {
- const NUM_WAKERS: usize = 32;
-
- let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
- let mut curr_waker = 0;
+ let mut wakers = WakeList::new();
// There are waiters, the lock must be acquired to notify.
let mut waiters = self.waiters.lock();
@@ -414,7 +412,7 @@ impl Notify {
// concurrently change, as holding the lock is required to
// transition **out** of `WAITING`.
'outer: loop {
- while curr_waker < NUM_WAKERS {
+ while wakers.can_push() {
match waiters.pop_back() {
Some(mut waiter) => {
// Safety: `waiters` lock is still held.
@@ -425,8 +423,7 @@ impl Notify {
waiter.notified = Some(NotificationType::AllWaiters);
if let Some(waker) = waiter.waker.take() {
- wakers[curr_waker] = Some(waker);
- curr_waker += 1;
+ wakers.push(waker);
}
}
None => {
@@ -437,11 +434,7 @@ impl Notify {
drop(waiters);
- for waker in wakers.iter_mut().take(curr_waker) {
- waker.take().unwrap().wake();
- }
-
- curr_waker = 0;
+ wakers.wake_all();
// Acquire the lock again.
waiters = self.waiters.lock();
@@ -456,9 +449,7 @@ impl Notify {
// Release the lock before notifying
drop(waiters);
- for waker in wakers.iter_mut().take(curr_waker) {
- waker.take().unwrap().wake();
- }
+ wakers.wake_all();
}
}
diff --git a/src/sync/tests/loom_mpsc.rs b/src/sync/tests/loom_mpsc.rs
index c12313b..f165e70 100644
--- a/src/sync/tests/loom_mpsc.rs
+++ b/src/sync/tests/loom_mpsc.rs
@@ -132,3 +132,59 @@ fn dropping_unbounded_tx() {
assert!(v.is_none());
});
}
+
+#[test]
+fn try_recv() {
+ loom::model(|| {
+ use crate::sync::{mpsc, Semaphore};
+ use loom::sync::{Arc, Mutex};
+
+ const PERMITS: usize = 2;
+ const TASKS: usize = 2;
+ const CYCLES: usize = 1;
+
+ struct Context {
+ sem: Arc<Semaphore>,
+ tx: mpsc::Sender<()>,
+ rx: Mutex<mpsc::Receiver<()>>,
+ }
+
+ fn run(ctx: &Context) {
+ block_on(async {
+ let permit = ctx.sem.acquire().await;
+ assert_ok!(ctx.rx.lock().unwrap().try_recv());
+ crate::task::yield_now().await;
+ assert_ok!(ctx.tx.clone().try_send(()));
+ drop(permit);
+ });
+ }
+
+ let (tx, rx) = mpsc::channel(PERMITS);
+ let sem = Arc::new(Semaphore::new(PERMITS));
+ let ctx = Arc::new(Context {
+ sem,
+ tx,
+ rx: Mutex::new(rx),
+ });
+
+ for _ in 0..PERMITS {
+ assert_ok!(ctx.tx.clone().try_send(()));
+ }
+
+ let mut ths = Vec::new();
+
+ for _ in 0..TASKS {
+ let ctx = ctx.clone();
+
+ ths.push(thread::spawn(move || {
+ run(&ctx);
+ }));
+ }
+
+ run(&ctx);
+
+ for th in ths {
+ th.join().unwrap();
+ }
+ });
+}
diff --git a/src/sync/watch.rs b/src/sync/watch.rs
index 96d1d16..b5da218 100644
--- a/src/sync/watch.rs
+++ b/src/sync/watch.rs
@@ -123,9 +123,7 @@ pub mod error {
/// Error produced when sending a value fails.
#[derive(Debug)]
- pub struct SendError<T> {
- pub(crate) inner: T,
- }
+ pub struct SendError<T>(pub T);
// ===== impl SendError =====
@@ -165,6 +163,9 @@ mod state {
/// Snapshot of the state. The first bit is used as the CLOSED bit.
/// The remaining bits are used as the version.
+ ///
+ /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
+ /// receivers does not set it.
#[derive(Copy, Clone, Debug)]
pub(super) struct StateSnapshot(usize);
@@ -427,8 +428,8 @@ impl<T> Sender<T> {
/// every receiver has been dropped.
pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
// This is pretty much only useful as a hint anyway, so synchronization isn't critical.
- if 0 == self.shared.ref_count_rx.load(Relaxed) {
- return Err(error::SendError { inner: value });
+ if 0 == self.receiver_count() {
+ return Err(error::SendError(value));
}
{
@@ -484,7 +485,7 @@ impl<T> Sender<T> {
/// assert!(tx.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
- self.shared.ref_count_rx.load(Relaxed) == 0
+ self.receiver_count() == 0
}
/// Completes when all receivers have dropped.
@@ -517,23 +518,81 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn closed(&self) {
- let notified = self.shared.notify_tx.notified();
+ while self.receiver_count() > 0 {
+ let notified = self.shared.notify_tx.notified();
- if self.shared.ref_count_rx.load(Relaxed) == 0 {
- return;
- }
+ if self.receiver_count() == 0 {
+ return;
+ }
- notified.await;
- debug_assert_eq!(0, self.shared.ref_count_rx.load(Relaxed));
+ notified.await;
+ // The channel could have been reopened in the meantime by calling
+ // `subscribe`, so we loop again.
+ }
}
- cfg_signal_internal! {
- pub(crate) fn subscribe(&self) -> Receiver<T> {
- let shared = self.shared.clone();
- let version = shared.state.load().version();
+ /// Creates a new [`Receiver`] connected to this `Sender`.
+ ///
+ /// All messages sent before this call to `subscribe` are initially marked
+ /// as seen by the new `Receiver`.
+ ///
+ /// This method can be called even if there are no other receivers. In this
+ /// case, the channel is reopened.
+ ///
+ /// # Examples
+ ///
+ /// The new channel will receive messages sent on this `Sender`.
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, _rx) = watch::channel(0u64);
+ ///
+ /// tx.send(5).unwrap();
+ ///
+ /// let rx = tx.subscribe();
+ /// assert_eq!(5, *rx.borrow());
+ ///
+ /// tx.send(10).unwrap();
+ /// assert_eq!(10, *rx.borrow());
+ /// }
+ /// ```
+ ///
+ /// The most recent message is considered seen by the channel, so this test
+ /// is guaranteed to pass.
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ /// use tokio::time::Duration;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, _rx) = watch::channel(0u64);
+ /// tx.send(5).unwrap();
+ /// let mut rx = tx.subscribe();
+ ///
+ /// tokio::spawn(async move {
+ /// // by spawning and sleeping, the message is sent after `main`
+ /// // hits the call to `changed`.
+ /// # if false {
+ /// tokio::time::sleep(Duration::from_millis(10)).await;
+ /// # }
+ /// tx.send(100).unwrap();
+ /// });
+ ///
+ /// rx.changed().await.unwrap();
+ /// assert_eq!(100, *rx.borrow());
+ /// }
+ /// ```
+ pub fn subscribe(&self) -> Receiver<T> {
+ let shared = self.shared.clone();
+ let version = shared.state.load().version();
- Receiver::from_shared(version, shared)
- }
+ // The CLOSED bit in the state tracks only whether the sender is
+ // dropped, so we do not need to unset it if this reopens the channel.
+ Receiver::from_shared(version, shared)
}
/// Returns the number of receivers that currently exist
diff --git a/src/task/yield_now.rs b/src/task/yield_now.rs
index 251cb93..5eeb46a 100644
--- a/src/task/yield_now.rs
+++ b/src/task/yield_now.rs
@@ -2,37 +2,58 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
-cfg_rt! {
- /// Yields execution back to the Tokio runtime.
- ///
- /// A task yields by awaiting on `yield_now()`, and may resume when that
- /// future completes (with no output.) The current task will be re-added as
- /// a pending task at the _back_ of the pending queue. Any other pending
- /// tasks will be scheduled. No other waking is required for the task to
- /// continue.
- ///
- /// See also the usage example in the [task module](index.html#yield_now).
- #[must_use = "yield_now does nothing unless polled/`await`-ed"]
- pub async fn yield_now() {
- /// Yield implementation
- struct YieldNow {
- yielded: bool,
- }
-
- impl Future for YieldNow {
- type Output = ();
+/// Yields execution back to the Tokio runtime.
+///
+/// A task yields by awaiting on `yield_now()`, and may resume when that future
+/// completes (with no output.) The current task will be re-added as a pending
+/// task at the _back_ of the pending queue. Any other pending tasks will be
+/// scheduled. No other waking is required for the task to continue.
+///
+/// See also the usage example in the [task module](index.html#yield_now).
+///
+/// ## Non-guarantees
+///
+/// This function may not yield all the way up to the executor if there are any
+/// special combinators above it in the call stack. For example, if a
+/// [`tokio::select!`] has another branch complete during the same poll as the
+/// `yield_now()`, then the yield is not propagated all the way up to the
+/// runtime.
+///
+/// It is generally not guaranteed that the runtime behaves like you expect it
+/// to when deciding which task to schedule next after a call to `yield_now()`.
+/// In particular, the runtime may choose to poll the task that just ran
+/// `yield_now()` again immediately without polling any other tasks first. For
+/// example, the runtime will not drive the IO driver between every poll of a
+/// task, and this could result in the runtime polling the current task again
+/// immediately even if there is another task that could make progress if that
+/// other task is waiting for a notification from the IO driver.
+///
+/// In general, changes to the order in which the runtime polls tasks is not
+/// considered a breaking change, and your program should be correct no matter
+/// which order the runtime polls your tasks in.
+///
+/// [`tokio::select!`]: macro@crate::select
+#[must_use = "yield_now does nothing unless polled/`await`-ed"]
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+pub async fn yield_now() {
+ /// Yield implementation
+ struct YieldNow {
+ yielded: bool,
+ }
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
- if self.yielded {
- return Poll::Ready(());
- }
+ impl Future for YieldNow {
+ type Output = ();
- self.yielded = true;
- cx.waker().wake_by_ref();
- Poll::Pending
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ if self.yielded {
+ return Poll::Ready(());
}
- }
- YieldNow { yielded: false }.await
+ self.yielded = true;
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
}
+
+ YieldNow { yielded: false }.await
}
diff --git a/src/time/clock.rs b/src/time/clock.rs
index a44d75f..fae5c76 100644
--- a/src/time/clock.rs
+++ b/src/time/clock.rs
@@ -64,15 +64,24 @@ cfg_test_util! {
/// Pause time
///
/// The current value of `Instant::now()` is saved and all subsequent calls
- /// to `Instant::now()` until the timer wheel is checked again will return
- /// the saved value. Once the timer wheel is checked, time will immediately
- /// advance to the next registered `Sleep`. This is useful for running tests
- /// that depend on time.
+ /// to `Instant::now()` will return the saved value. The saved value can be
+ /// changed by [`advance`] or by the time auto-advancing once the runtime
+ /// has no work to do. This only affects the `Instant` type in Tokio, and
+ /// the `Instant` in std continues to work as normal.
///
/// Pausing time requires the `current_thread` Tokio runtime. This is the
/// default runtime used by `#[tokio::test]`. The runtime can be initialized
/// with time in a paused state using the `Builder::start_paused` method.
///
+ /// For cases where time is immediately paused, it is better to pause
+ /// the time using the `main` or `test` macro:
+ /// ```
+ /// #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// async fn main() {
+ /// println!("Hello world");
+ /// }
+ /// ```
+ ///
/// # Panics
///
/// Panics if time is already frozen or if called from outside of a
@@ -86,6 +95,7 @@ cfg_test_util! {
/// current time when awaited.
///
/// [`Sleep`]: crate::time::Sleep
+ /// [`advance`]: crate::time::advance
pub fn pause() {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.pause();
@@ -111,11 +121,25 @@ cfg_test_util! {
inner.unfrozen = Some(std::time::Instant::now());
}
- /// Advance time
+ /// Advance time.
///
/// Increments the saved `Instant::now()` value by `duration`. Subsequent
/// calls to `Instant::now()` will return the result of the increment.
///
+ /// This function will make the current time jump forward by the given
+ /// duration in one jump. This means that all `sleep` calls with a deadline
+ /// before the new time will immediately complete "at the same time", and
+ /// the runtime is free to poll them in any order. Additionally, this
+ /// method will not wait for the `sleep` calls it advanced past to complete.
+ /// If you want to do that, you should instead call [`sleep`] and rely on
+ /// the runtime's auto-advance feature.
+ ///
+ /// Note that calls to `sleep` are not guaranteed to complete the first time
+ /// they are polled after a call to `advance`. For example, this can happen
+ /// if the runtime has not yet touched the timer driver after the call to
+ /// `advance`. However if they don't, the runtime will poll the task again
+ /// shortly.
+ ///
/// # Panics
///
/// Panics if time is not frozen or if called from outside of the Tokio
@@ -126,6 +150,8 @@ cfg_test_util! {
/// If the time is paused and there is no work to do, the runtime advances
/// time to the next timer. See [`pause`](pause#auto-advance) for more
/// details.
+ ///
+ /// [`sleep`]: fn@crate::time::sleep
pub async fn advance(duration: Duration) {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
clock.advance(duration);
diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs
index 37d2231..f611fbb 100644
--- a/src/time/driver/mod.rs
+++ b/src/time/driver/mod.rs
@@ -288,13 +288,21 @@ impl Handle {
self.process_at_time(now)
}
- pub(self) fn process_at_time(&self, now: u64) {
+ pub(self) fn process_at_time(&self, mut now: u64) {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;
let mut lock = self.get().lock();
- assert!(now >= lock.elapsed);
+ if now < lock.elapsed {
+ // Time went backwards! This normally shouldn't happen as the Rust language
+ // guarantees that an Instant is monotonic, but can happen when running
+ // Linux in a VM on a Windows host due to std incorrectly trusting the
+ // hardware clock to be monotonic.
+ //
+ // See <https://github.com/tokio-rs/tokio/issues/3619> for more information.
+ now = lock.elapsed;
+ }
while let Some(entry) = lock.wheel.poll(now) {
debug_assert!(unsafe { entry.is_pending() });
diff --git a/src/time/driver/sleep.rs b/src/time/driver/sleep.rs
index 40f745a..4e9ed65 100644
--- a/src/time/driver/sleep.rs
+++ b/src/time/driver/sleep.rs
@@ -12,10 +12,31 @@ use std::task::{self, Poll};
/// operates at millisecond granularity and should not be used for tasks that
/// require high-resolution timers.
///
+/// To run something regularly on a schedule, see [`interval`].
+///
/// # Cancellation
///
/// Canceling a sleep instance is done by dropping the returned future. No additional
/// cleanup work is required.
+///
+/// # Examples
+///
+/// Wait 100ms and print "100 ms have elapsed".
+///
+/// ```
+/// use tokio::time::{sleep_until, Instant, Duration};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// sleep_until(Instant::now() + Duration::from_millis(100)).await;
+/// println!("100 ms have elapsed");
+/// }
+/// ```
+///
+/// See the documentation for the [`Sleep`] type for more examples.
+///
+/// [`Sleep`]: struct@crate::time::Sleep
+/// [`interval`]: crate::time::interval()
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "delay_until"))]
pub fn sleep_until(deadline: Instant) -> Sleep {
@@ -54,6 +75,9 @@ pub fn sleep_until(deadline: Instant) -> Sleep {
/// }
/// ```
///
+/// See the documentation for the [`Sleep`] type for more examples.
+///
+/// [`Sleep`]: struct@crate::time::Sleep
/// [`interval`]: crate::time::interval()
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "delay_for"))]
@@ -216,6 +240,8 @@ impl Sleep {
/// # }
/// ```
///
+ /// See also the top-level examples.
+ ///
/// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();
diff --git a/src/util/mod.rs b/src/util/mod.rs
index 9065f50..df30f2b 100644
--- a/src/util/mod.rs
+++ b/src/util/mod.rs
@@ -4,6 +4,29 @@ cfg_io_driver! {
}
#[cfg(any(
+ // io driver uses `WakeList` directly
+ feature = "net",
+ feature = "process",
+ // `sync` enables `Notify` and `batch_semaphore`, which require `WakeList`.
+ feature = "sync",
+ // `fs` uses `batch_semaphore`, which requires `WakeList`.
+ feature = "fs",
+ // rt and signal use `Notify`, which requires `WakeList`.
+ feature = "rt",
+ feature = "signal",
+))]
+mod wake_list;
+#[cfg(any(
+ feature = "net",
+ feature = "process",
+ feature = "sync",
+ feature = "fs",
+ feature = "rt",
+ feature = "signal",
+))]
+pub(crate) use wake_list::WakeList;
+
+#[cfg(any(
feature = "fs",
feature = "net",
feature = "process",
diff --git a/src/util/trace.rs b/src/util/trace.rs
index c51a5a7..61c155c 100644
--- a/src/util/trace.rs
+++ b/src/util/trace.rs
@@ -11,17 +11,17 @@ cfg_trace! {
#[cfg(tokio_track_caller)]
let span = tracing::trace_span!(
target: "tokio::task",
- "task",
+ "runtime.spawn",
%kind,
+ task.name = %name.unwrap_or_default(),
spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
- task.name = %name.unwrap_or_default()
);
#[cfg(not(tokio_track_caller))]
let span = tracing::trace_span!(
target: "tokio::task",
- "task",
+ "runtime.spawn",
%kind,
- task.name = %name.unwrap_or_default()
+ task.name = %name.unwrap_or_default(),
);
task.instrument(span)
}
diff --git a/src/util/wake_list.rs b/src/util/wake_list.rs
new file mode 100644
index 0000000..aa569dd
--- /dev/null
+++ b/src/util/wake_list.rs
@@ -0,0 +1,53 @@
+use core::mem::MaybeUninit;
+use core::ptr;
+use std::task::Waker;
+
+const NUM_WAKERS: usize = 32;
+
+pub(crate) struct WakeList {
+ inner: [MaybeUninit<Waker>; NUM_WAKERS],
+ curr: usize,
+}
+
+impl WakeList {
+ pub(crate) fn new() -> Self {
+ Self {
+ inner: unsafe {
+ // safety: Create an uninitialized array of `MaybeUninit`. The
+ // `assume_init` is safe because the type we are claiming to
+ // have initialized here is a bunch of `MaybeUninit`s, which do
+ // not require initialization.
+ MaybeUninit::uninit().assume_init()
+ },
+ curr: 0,
+ }
+ }
+
+ #[inline]
+ pub(crate) fn can_push(&self) -> bool {
+ self.curr < NUM_WAKERS
+ }
+
+ pub(crate) fn push(&mut self, val: Waker) {
+ debug_assert!(self.can_push());
+
+ self.inner[self.curr] = MaybeUninit::new(val);
+ self.curr += 1;
+ }
+
+ pub(crate) fn wake_all(&mut self) {
+ assert!(self.curr <= NUM_WAKERS);
+ while self.curr > 0 {
+ self.curr -= 1;
+ let waker = unsafe { ptr::read(self.inner[self.curr].as_mut_ptr()) };
+ waker.wake();
+ }
+ }
+}
+
+impl Drop for WakeList {
+ fn drop(&mut self) {
+ let slice = ptr::slice_from_raw_parts_mut(self.inner.as_mut_ptr() as *mut Waker, self.curr);
+ unsafe { ptr::drop_in_place(slice) };
+ }
+}
diff --git a/tests/fs_file.rs b/tests/fs_file.rs
index bf2f1d7..f645e61 100644
--- a/tests/fs_file.rs
+++ b/tests/fs_file.rs
@@ -1,12 +1,11 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-use tokio::fs::File;
-use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
-use tokio_test::task;
-
use std::io::prelude::*;
use tempfile::NamedTempFile;
+use tokio::fs::File;
+use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
+use tokio_test::task;
const HELLO: &[u8] = b"hello world...";
@@ -51,6 +50,19 @@ async fn basic_write_and_shutdown() {
}
#[tokio::test]
+async fn rewind_seek_position() {
+ let tempfile = tempfile();
+
+ let mut file = File::create(tempfile.path()).await.unwrap();
+
+ file.seek(SeekFrom::Current(10)).await.unwrap();
+
+ file.rewind().await.unwrap();
+
+ assert_eq!(file.stream_position().await.unwrap(), 0);
+}
+
+#[tokio::test]
async fn coop() {
let mut tempfile = tempfile();
tempfile.write_all(HELLO).unwrap();
diff --git a/tests/io_async_fd.rs b/tests/io_async_fd.rs
index dc21e42..5a6875e 100644
--- a/tests/io_async_fd.rs
+++ b/tests/io_async_fd.rs
@@ -15,7 +15,7 @@ use std::{
use nix::unistd::{close, read, write};
-use futures::{poll, FutureExt};
+use futures::poll;
use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
use tokio_test::{assert_err, assert_pending};
@@ -163,10 +163,11 @@ async fn initially_writable() {
afd_a.writable().await.unwrap().clear_ready();
afd_b.writable().await.unwrap().clear_ready();
- futures::select_biased! {
- _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {},
- _ = afd_a.readable().fuse() => panic!("Unexpected readable state"),
- _ = afd_b.readable().fuse() => panic!("Unexpected readable state"),
+ tokio::select! {
+ biased;
+ _ = tokio::time::sleep(Duration::from_millis(10)) => {},
+ _ = afd_a.readable() => panic!("Unexpected readable state"),
+ _ = afd_b.readable() => panic!("Unexpected readable state"),
}
}
@@ -353,12 +354,13 @@ async fn multiple_waiters() {
futures::future::pending::<()>().await;
};
- futures::select_biased! {
- guard = afd_a.readable().fuse() => {
+ tokio::select! {
+ biased;
+ guard = afd_a.readable() => {
tokio::task::yield_now().await;
guard.unwrap().clear_ready()
},
- _ = notify_barrier.fuse() => unreachable!(),
+ _ = notify_barrier => unreachable!(),
}
std::mem::drop(afd_a);
diff --git a/tests/io_fill_buf.rs b/tests/io_fill_buf.rs
new file mode 100644
index 0000000..0b2ebd7
--- /dev/null
+++ b/tests/io_fill_buf.rs
@@ -0,0 +1,34 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tempfile::NamedTempFile;
+use tokio::fs::File;
+use tokio::io::{AsyncBufReadExt, BufReader};
+use tokio_test::assert_ok;
+
+#[tokio::test]
+async fn fill_buf_file() {
+ let file = NamedTempFile::new().unwrap();
+
+ assert_ok!(std::fs::write(file.path(), b"hello"));
+
+ let file = assert_ok!(File::open(file.path()).await);
+ let mut file = BufReader::new(file);
+
+ let mut contents = Vec::new();
+
+ loop {
+ let consumed = {
+ let buffer = assert_ok!(file.fill_buf().await);
+ if buffer.is_empty() {
+ break;
+ }
+ contents.extend_from_slice(buffer);
+ buffer.len()
+ };
+
+ file.consume(consumed);
+ }
+
+ assert_eq!(contents, b"hello");
+}
diff --git a/tests/io_poll_aio.rs b/tests/io_poll_aio.rs
new file mode 100644
index 0000000..f044af5
--- /dev/null
+++ b/tests/io_poll_aio.rs
@@ -0,0 +1,375 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(target_os = "freebsd", feature = "net"))]
+
+use mio_aio::{AioCb, AioFsyncMode, LioCb};
+use std::{
+ future::Future,
+ mem,
+ os::unix::io::{AsRawFd, RawFd},
+ pin::Pin,
+ task::{Context, Poll},
+};
+use tempfile::tempfile;
+use tokio::io::bsd::{Aio, AioSource};
+use tokio_test::assert_pending;
+
+mod aio {
+ use super::*;
+
+ /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource
+ struct WrappedAioCb<'a>(AioCb<'a>);
+ impl<'a> AioSource for WrappedAioCb<'a> {
+ fn register(&mut self, kq: RawFd, token: usize) {
+ self.0.register_raw(kq, token)
+ }
+ fn deregister(&mut self) {
+ self.0.deregister_raw()
+ }
+ }
+
+ /// A very crude implementation of an AIO-based future
+ struct FsyncFut(Aio<WrappedAioCb<'static>>);
+
+ impl Future for FsyncFut {
+ type Output = std::io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let poll_result = self.0.poll_ready(cx);
+ match poll_result {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(_ev)) => {
+ // At this point, we could clear readiness. But there's no
+ // point, since we're about to drop the Aio.
+ let result = (*self.0).0.aio_return();
+ match result {
+ Ok(_) => Poll::Ready(Ok(())),
+ Err(e) => Poll::Ready(Err(e.into())),
+ }
+ }
+ }
+ }
+ }
+
+ /// Low-level AIO Source
+ ///
+ /// An example bypassing mio_aio and Nix to demonstrate how the kevent
+ /// registration actually works, under the hood.
+ struct LlSource(Pin<Box<libc::aiocb>>);
+
+ impl AioSource for LlSource {
+ fn register(&mut self, kq: RawFd, token: usize) {
+ let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
+ sev.sigev_notify = libc::SIGEV_KEVENT;
+ sev.sigev_signo = kq;
+ sev.sigev_value = libc::sigval {
+ sival_ptr: token as *mut libc::c_void,
+ };
+ self.0.aio_sigevent = sev;
+ }
+
+ fn deregister(&mut self) {
+ unsafe {
+ self.0.aio_sigevent = mem::zeroed();
+ }
+ }
+ }
+
+ struct LlFut(Aio<LlSource>);
+
+ impl Future for LlFut {
+ type Output = std::io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let poll_result = self.0.poll_ready(cx);
+ match poll_result {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(_ev)) => {
+ let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
+ assert_eq!(0, r);
+ Poll::Ready(Ok(()))
+ }
+ }
+ }
+ }
+
+ /// A very simple object that can implement AioSource and can be reused.
+ ///
+ /// mio_aio normally assumes that each AioCb will be consumed on completion.
+ /// This somewhat contrived example shows how an Aio object can be reused
+ /// anyway.
+ struct ReusableFsyncSource {
+ aiocb: Pin<Box<AioCb<'static>>>,
+ fd: RawFd,
+ token: usize,
+ }
+ impl ReusableFsyncSource {
+ fn fsync(&mut self) {
+ self.aiocb.register_raw(self.fd, self.token);
+ self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap();
+ }
+ fn new(aiocb: AioCb<'static>) -> Self {
+ ReusableFsyncSource {
+ aiocb: Box::pin(aiocb),
+ fd: 0,
+ token: 0,
+ }
+ }
+ fn reset(&mut self, aiocb: AioCb<'static>) {
+ self.aiocb = Box::pin(aiocb);
+ }
+ }
+ impl AioSource for ReusableFsyncSource {
+ fn register(&mut self, kq: RawFd, token: usize) {
+ self.fd = kq;
+ self.token = token;
+ }
+ fn deregister(&mut self) {
+ self.fd = 0;
+ }
+ }
+
+ struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>);
+ impl<'a> Future for ReusableFsyncFut<'a> {
+ type Output = std::io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let poll_result = self.0.poll_ready(cx);
+ match poll_result {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(ev)) => {
+ // Since this future uses a reusable Aio, we must clear
+ // its readiness here. That makes the future
+ // non-idempotent; the caller can't poll it repeatedly after
+ // it has already returned Ready. But that's ok; most
+ // futures behave this way.
+ self.0.clear_ready(ev);
+ let result = (*self.0).aiocb.aio_return();
+ match result {
+ Ok(_) => Poll::Ready(Ok(())),
+ Err(e) => Poll::Ready(Err(e.into())),
+ }
+ }
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn fsync() {
+ let f = tempfile().unwrap();
+ let fd = f.as_raw_fd();
+ let aiocb = AioCb::from_fd(fd, 0);
+ let source = WrappedAioCb(aiocb);
+ let mut poll_aio = Aio::new_for_aio(source).unwrap();
+ (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap();
+ let fut = FsyncFut(poll_aio);
+ fut.await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn ll_fsync() {
+ let f = tempfile().unwrap();
+ let fd = f.as_raw_fd();
+ let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
+ aiocb.aio_fildes = fd;
+ let source = LlSource(Box::pin(aiocb));
+ let mut poll_aio = Aio::new_for_aio(source).unwrap();
+ let r = unsafe {
+ let p = (*poll_aio).0.as_mut().get_unchecked_mut();
+ libc::aio_fsync(libc::O_SYNC, p)
+ };
+ assert_eq!(0, r);
+ let fut = LlFut(poll_aio);
+ fut.await.unwrap();
+ }
+
+ /// A suitably crafted future type can reuse an Aio object
+ #[tokio::test]
+ async fn reuse() {
+ let f = tempfile().unwrap();
+ let fd = f.as_raw_fd();
+ let aiocb0 = AioCb::from_fd(fd, 0);
+ let source = ReusableFsyncSource::new(aiocb0);
+ let mut poll_aio = Aio::new_for_aio(source).unwrap();
+ poll_aio.fsync();
+ let fut0 = ReusableFsyncFut(&mut poll_aio);
+ fut0.await.unwrap();
+
+ let aiocb1 = AioCb::from_fd(fd, 0);
+ poll_aio.reset(aiocb1);
+ let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
+ assert_pending!(poll_aio.poll_ready(&mut ctx));
+ poll_aio.fsync();
+ let fut1 = ReusableFsyncFut(&mut poll_aio);
+ fut1.await.unwrap();
+ }
+}
+
+mod lio {
+ use super::*;
+
+ struct WrappedLioCb<'a>(LioCb<'a>);
+ impl<'a> AioSource for WrappedLioCb<'a> {
+ fn register(&mut self, kq: RawFd, token: usize) {
+ self.0.register_raw(kq, token)
+ }
+ fn deregister(&mut self) {
+ self.0.deregister_raw()
+ }
+ }
+
+ /// A very crude lio_listio-based Future
+ struct LioFut(Option<Aio<WrappedLioCb<'static>>>);
+
+ impl Future for LioFut {
+ type Output = std::io::Result<Vec<isize>>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let poll_result = self.0.as_mut().unwrap().poll_ready(cx);
+ match poll_result {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(_ev)) => {
+ // At this point, we could clear readiness. But there's no
+ // point, since we're about to drop the Aio.
+ let r = self.0.take().unwrap().into_inner().0.into_results(|iter| {
+ iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
+ });
+ Poll::Ready(Ok(r))
+ }
+ }
+ }
+ }
+
+ /// Minimal example demonstrating reuse of an Aio object with lio
+ /// readiness. mio_aio::LioCb actually does something similar under the
+ /// hood.
+ struct ReusableLioSource {
+ liocb: Option<LioCb<'static>>,
+ fd: RawFd,
+ token: usize,
+ }
+ impl ReusableLioSource {
+ fn new(liocb: LioCb<'static>) -> Self {
+ ReusableLioSource {
+ liocb: Some(liocb),
+ fd: 0,
+ token: 0,
+ }
+ }
+ fn reset(&mut self, liocb: LioCb<'static>) {
+ self.liocb = Some(liocb);
+ }
+ fn submit(&mut self) {
+ self.liocb
+ .as_mut()
+ .unwrap()
+ .register_raw(self.fd, self.token);
+ self.liocb.as_mut().unwrap().submit().unwrap();
+ }
+ }
+ impl AioSource for ReusableLioSource {
+ fn register(&mut self, kq: RawFd, token: usize) {
+ self.fd = kq;
+ self.token = token;
+ }
+ fn deregister(&mut self) {
+ self.fd = 0;
+ }
+ }
+ struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>);
+ impl<'a> Future for ReusableLioFut<'a> {
+ type Output = std::io::Result<Vec<isize>>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let poll_result = self.0.poll_ready(cx);
+ match poll_result {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(ev)) => {
+ // Since this future uses a reusable Aio, we must clear
+ // its readiness here. That makes the future
+ // non-idempotent; the caller can't poll it repeatedly after
+ // it has already returned Ready. But that's ok; most
+ // futures behave this way.
+ self.0.clear_ready(ev);
+ let r = (*self.0).liocb.take().unwrap().into_results(|iter| {
+ iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
+ });
+ Poll::Ready(Ok(r))
+ }
+ }
+ }
+ }
+
+ /// An lio_listio operation with one write element
+ #[tokio::test]
+ async fn onewrite() {
+ const WBUF: &[u8] = b"abcdef";
+ let f = tempfile().unwrap();
+
+ let mut builder = mio_aio::LioCbBuilder::with_capacity(1);
+ builder = builder.emplace_slice(
+ f.as_raw_fd(),
+ 0,
+ &WBUF[..],
+ 0,
+ mio_aio::LioOpcode::LIO_WRITE,
+ );
+ let liocb = builder.finish();
+ let source = WrappedLioCb(liocb);
+ let mut poll_aio = Aio::new_for_lio(source).unwrap();
+
+ // Send the operation to the kernel
+ (*poll_aio).0.submit().unwrap();
+ let fut = LioFut(Some(poll_aio));
+ let v = fut.await.unwrap();
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0] as usize, WBUF.len());
+ }
+
+ /// A suitably crafted future type can reuse an Aio object
+ #[tokio::test]
+ async fn reuse() {
+ const WBUF: &[u8] = b"abcdef";
+ let f = tempfile().unwrap();
+
+ let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1);
+ builder0 = builder0.emplace_slice(
+ f.as_raw_fd(),
+ 0,
+ &WBUF[..],
+ 0,
+ mio_aio::LioOpcode::LIO_WRITE,
+ );
+ let liocb0 = builder0.finish();
+ let source = ReusableLioSource::new(liocb0);
+ let mut poll_aio = Aio::new_for_aio(source).unwrap();
+ poll_aio.submit();
+ let fut0 = ReusableLioFut(&mut poll_aio);
+ let v = fut0.await.unwrap();
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0] as usize, WBUF.len());
+
+ // Now reuse the same Aio
+ let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1);
+ builder1 = builder1.emplace_slice(
+ f.as_raw_fd(),
+ 0,
+ &WBUF[..],
+ 0,
+ mio_aio::LioOpcode::LIO_WRITE,
+ );
+ let liocb1 = builder1.finish();
+ poll_aio.reset(liocb1);
+ let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
+ assert_pending!(poll_aio.poll_ready(&mut ctx));
+ poll_aio.submit();
+ let fut1 = ReusableLioFut(&mut poll_aio);
+ let v = fut1.await.unwrap();
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0] as usize, WBUF.len());
+ }
+}
diff --git a/tests/process_kill_on_drop.rs b/tests/process_kill_on_drop.rs
index 00f5c6d..658e4ad 100644
--- a/tests/process_kill_on_drop.rs
+++ b/tests/process_kill_on_drop.rs
@@ -1,6 +1,7 @@
#![cfg(all(unix, feature = "process"))]
#![warn(rust_2018_idioms)]
+use std::io::ErrorKind;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::AsyncReadExt;
@@ -24,11 +25,12 @@ async fn kill_on_drop() {
",
]);
- let mut child = cmd
- .kill_on_drop(true)
- .stdout(Stdio::piped())
- .spawn()
- .unwrap();
+ let e = cmd.kill_on_drop(true).stdout(Stdio::piped()).spawn();
+ if e.is_err() && e.as_ref().unwrap_err().kind() == ErrorKind::NotFound {
+ println!("bash not available; skipping test");
+ return;
+ }
+ let mut child = e.unwrap();
sleep(Duration::from_secs(2)).await;
diff --git a/tests/sync_mpsc.rs b/tests/sync_mpsc.rs
index cd43ad4..1947d26 100644
--- a/tests/sync_mpsc.rs
+++ b/tests/sync_mpsc.rs
@@ -5,7 +5,7 @@
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
-use tokio::sync::mpsc::error::TrySendError;
+use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
@@ -328,6 +328,27 @@ async fn try_send_fail() {
}
#[tokio::test]
+async fn try_send_fail_with_try_recv() {
+ let (tx, mut rx) = mpsc::channel(1);
+
+ tx.try_send("hello").unwrap();
+
+ // This should fail
+ match assert_err!(tx.try_send("fail")) {
+ TrySendError::Full(..) => {}
+ _ => panic!(),
+ }
+
+ assert_eq!(rx.try_recv(), Ok("hello"));
+
+ assert_ok!(tx.try_send("goodbye"));
+ drop(tx);
+
+ assert_eq!(rx.try_recv(), Ok("goodbye"));
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+}
+
+#[tokio::test]
async fn try_reserve_fails() {
let (tx, mut rx) = mpsc::channel(1);
@@ -389,13 +410,15 @@ fn dropping_rx_closes_channel_for_try() {
drop(rx);
- {
- let err = assert_err!(tx.try_send(msg.clone()));
- match err {
- TrySendError::Closed(..) => {}
- _ => panic!(),
- }
- }
+ assert!(matches!(
+ tx.try_send(msg.clone()),
+ Err(TrySendError::Closed(_))
+ ));
+ assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
+ assert!(matches!(
+ tx.try_reserve_owned(),
+ Err(TrySendError::Closed(_))
+ ));
assert_eq!(1, Arc::strong_count(&msg));
}
@@ -494,3 +517,83 @@ async fn permit_available_not_acquired_close() {
drop(permit2);
assert!(rx.recv().await.is_none());
}
+
+#[test]
+fn try_recv_bounded() {
+ let (tx, mut rx) = mpsc::channel(5);
+
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ assert!(tx.try_send("hello").is_err());
+
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ assert_eq!(Ok("hello"), rx.try_recv());
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ assert!(tx.try_send("hello").is_err());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ drop(tx);
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Ok("hello"), rx.try_recv());
+ assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
+}
+
+#[test]
+fn try_recv_unbounded() {
+ for num in 0..100 {
+ let (tx, mut rx) = mpsc::unbounded_channel();
+
+ for i in 0..num {
+ tx.send(i).unwrap();
+ }
+
+ for i in 0..num {
+ assert_eq!(rx.try_recv(), Ok(i));
+ }
+
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
+ drop(tx);
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+ }
+}
+
+#[test]
+fn try_recv_close_while_empty_bounded() {
+ let (tx, mut rx) = mpsc::channel::<()>(5);
+
+ assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+ drop(tx);
+ assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
+}
+
+#[test]
+fn try_recv_close_while_empty_unbounded() {
+ let (tx, mut rx) = mpsc::unbounded_channel::<()>();
+
+ assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
+ drop(tx);
+ assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
+}
diff --git a/tests/sync_watch.rs b/tests/sync_watch.rs
index a2a276d..b7bbaf7 100644
--- a/tests/sync_watch.rs
+++ b/tests/sync_watch.rs
@@ -186,3 +186,18 @@ fn borrow_and_update() {
assert_eq!(*rx.borrow_and_update(), "three");
assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
}
+
+#[test]
+fn reopened_after_subscribe() {
+ let (tx, rx) = watch::channel("one");
+ assert!(!tx.is_closed());
+
+ drop(rx);
+ assert!(tx.is_closed());
+
+ let rx = tx.subscribe();
+ assert!(!tx.is_closed());
+
+ drop(rx);
+ assert!(tx.is_closed());
+}