aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2021-11-16 18:39:59 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-11-16 18:39:59 +0000
commit5b1b46ec52426b0a27bf3a5a2908d3472a52e5d0 (patch)
treea86a1094890ceb522b599e1f37f24c2e16239fd3
parentb4b1fce325271157868568d1634fc4425e84c9ae (diff)
parent786869e7d42e26203f1c4ed10cfa383eef81ae7b (diff)
downloadtokio-5b1b46ec52426b0a27bf3a5a2908d3472a52e5d0.tar.gz
Update to 1.14.0 am: 1db412d2c3 am: 786869e7d4
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio/+/1892699 Change-Id: Ic71ca40bfd185d21110f7ff7ee5de5a62e698a68
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--CHANGELOG.md103
-rw-r--r--Cargo.toml17
-rw-r--r--Cargo.toml.orig8
-rw-r--r--METADATA8
-rw-r--r--README.md2
-rw-r--r--patches/test_fix.patch21
-rw-r--r--src/coop.rs10
-rw-r--r--src/fs/create_dir.rs2
-rw-r--r--src/fs/dir_builder.rs2
-rw-r--r--src/fs/file.rs4
-rw-r--r--src/fs/open_options.rs2
-rw-r--r--src/fs/read_dir.rs4
-rw-r--r--src/future/maybe_done.rs8
-rw-r--r--src/future/poll_fn.rs2
-rw-r--r--src/io/async_fd.rs4
-rw-r--r--src/io/blocking.rs4
-rw-r--r--src/io/bsd/poll_aio.rs8
-rw-r--r--src/io/driver/interest.rs12
-rw-r--r--src/io/driver/mod.rs18
-rw-r--r--src/io/driver/ready.rs12
-rw-r--r--src/io/driver/scheduled_io.rs20
-rw-r--r--src/io/mod.rs2
-rw-r--r--src/io/poll_evented.rs4
-rw-r--r--src/io/split.rs2
-rw-r--r--src/io/stdio_common.rs2
-rw-r--r--src/io/util/async_write_ext.rs8
-rw-r--r--src/io/util/buf_reader.rs2
-rw-r--r--src/io/util/copy.rs10
-rw-r--r--src/io/util/lines.rs6
-rw-r--r--src/lib.rs27
-rw-r--r--src/loom/std/mutex.rs2
-rw-r--r--src/macros/cfg.rs11
-rw-r--r--src/macros/join.rs2
-rw-r--r--src/macros/mod.rs5
-rw-r--r--src/macros/scoped_tls.rs2
-rw-r--r--src/macros/select.rs6
-rw-r--r--src/macros/trace.rs27
-rw-r--r--src/macros/try_join.rs41
-rw-r--r--src/net/tcp/listener.rs2
-rw-r--r--src/net/tcp/mod.rs2
-rw-r--r--src/net/tcp/socket.rs20
-rw-r--r--src/net/tcp/split.rs215
-rw-r--r--src/net/tcp/split_owned.rs213
-rw-r--r--src/net/tcp/stream.rs18
-rw-r--r--src/net/udp.rs90
-rw-r--r--src/net/unix/datagram/socket.rs88
-rw-r--r--src/net/unix/listener.rs2
-rw-r--r--src/net/unix/mod.rs2
-rw-r--r--src/net/unix/split.rs207
-rw-r--r--src/net/unix/split_owned.rs206
-rw-r--r--src/net/unix/stream.rs50
-rw-r--r--src/net/unix/ucred.rs8
-rw-r--r--src/net/windows/named_pipe.rs46
-rw-r--r--src/park/mod.rs12
-rw-r--r--src/park/thread.rs6
-rw-r--r--src/process/mod.rs8
-rw-r--r--src/process/unix/driver.rs2
-rw-r--r--src/process/unix/mod.rs2
-rw-r--r--src/runtime/basic_scheduler.rs34
-rw-r--r--src/runtime/blocking/pool.rs21
-rw-r--r--src/runtime/blocking/shutdown.rs4
-rw-r--r--src/runtime/blocking/task.rs4
-rw-r--r--src/runtime/context.rs70
-rw-r--r--src/runtime/enter.rs2
-rw-r--r--src/runtime/handle.rs92
-rw-r--r--src/runtime/mod.rs46
-rw-r--r--src/runtime/stats/stats.rs27
-rw-r--r--src/runtime/task/core.rs16
-rw-r--r--src/runtime/task/error.rs4
-rw-r--r--src/runtime/task/harness.rs18
-rw-r--r--src/runtime/task/inject.rs12
-rw-r--r--src/runtime/task/list.rs10
-rw-r--r--src/runtime/task/mod.rs12
-rw-r--r--src/runtime/task/raw.rs12
-rw-r--r--src/runtime/task/state.rs36
-rw-r--r--src/runtime/tests/loom_basic_scheduler.rs58
-rw-r--r--src/runtime/thread_pool/idle.rs2
-rw-r--r--src/runtime/thread_pool/mod.rs2
-rw-r--r--src/runtime/thread_pool/worker.rs14
-rw-r--r--src/signal/ctrl_c.rs9
-rw-r--r--src/signal/mod.rs2
-rw-r--r--src/signal/reusable_box.rs12
-rw-r--r--src/sync/batch_semaphore.rs8
-rw-r--r--src/sync/broadcast.rs32
-rw-r--r--src/sync/mpsc/block.rs4
-rw-r--r--src/sync/mpsc/bounded.rs20
-rw-r--r--src/sync/mpsc/chan.rs6
-rw-r--r--src/sync/mpsc/error.rs2
-rw-r--r--src/sync/mpsc/list.rs4
-rw-r--r--src/sync/mpsc/unbounded.rs2
-rw-r--r--src/sync/mutex.rs38
-rw-r--r--src/sync/notify.rs25
-rw-r--r--src/sync/once_cell.rs8
-rw-r--r--src/sync/oneshot.rs286
-rw-r--r--src/sync/rwlock/owned_read_guard.rs2
-rw-r--r--src/sync/rwlock/owned_write_guard.rs2
-rw-r--r--src/sync/rwlock/owned_write_guard_mapped.rs2
-rw-r--r--src/sync/rwlock/read_guard.rs2
-rw-r--r--src/sync/rwlock/write_guard.rs2
-rw-r--r--src/sync/rwlock/write_guard_mapped.rs2
-rw-r--r--src/sync/task/atomic_waker.rs80
-rw-r--r--src/sync/tests/atomic_waker.rs39
-rw-r--r--src/sync/tests/loom_atomic_waker.rs55
-rw-r--r--src/sync/tests/loom_oneshot.rs29
-rw-r--r--src/sync/tests/mod.rs1
-rw-r--r--src/sync/tests/notify.rs44
-rw-r--r--src/sync/watch.rs48
-rw-r--r--src/task/blocking.rs61
-rw-r--r--src/task/builder.rs5
-rw-r--r--src/task/local.rs18
-rw-r--r--src/time/clock.rs12
-rw-r--r--src/time/driver/entry.rs2
-rw-r--r--src/time/driver/handle.rs6
-rw-r--r--src/time/driver/mod.rs18
-rw-r--r--src/time/driver/sleep.rs176
-rw-r--r--src/time/driver/wheel/level.rs2
-rw-r--r--src/time/driver/wheel/mod.rs12
-rw-r--r--src/time/driver/wheel/stack.rs4
-rw-r--r--src/time/error.rs4
-rw-r--r--src/time/interval.rs13
-rw-r--r--src/time/timeout.rs16
-rw-r--r--src/util/bit.rs6
-rw-r--r--src/util/error.rs8
-rw-r--r--src/util/linked_list.rs8
-rw-r--r--src/util/rand.rs4
-rw-r--r--src/util/slab.rs30
-rw-r--r--src/util/trace.rs13
-rw-r--r--src/util/vec_deque_cell.rs2
-rw-r--r--src/util/wake.rs6
-rw-r--r--tests/macros_select.rs26
-rw-r--r--tests/macros_test.rs16
-rw-r--r--tests/rt_basic.rs35
-rw-r--r--tests/rt_threaded.rs25
-rw-r--r--tests/udp.rs44
-rw-r--r--tests/uds_datagram.rs47
136 files changed, 2927 insertions, 585 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 0577e49..92e2ebe 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "1ed89aa5cf7e5b9524b9e08a02030d222fd63417"
+ "sha1": "623c09c52c2c38a8d75e94c166593547e8477707"
}
}
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 16e44e5..afa8bf0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,102 @@
+# 1.14.0 (November 15, 2021)
+
+### Fixed
+
+- macros: fix compiler errors when using `mut` patterns in `select!` ([#4211])
+- sync: fix a data race between `oneshot::Sender::send` and awaiting a
+ `oneshot::Receiver` when the oneshot has been closed ([#4226])
+- sync: make `AtomicWaker` panic safe ([#3689])
+- runtime: fix basic scheduler dropping tasks outside a runtime context
+ ([#4213])
+
+### Added
+
+- stats: add `RuntimeStats::busy_duration_total` ([#4179], [#4223])
+
+### Changed
+
+- io: updated `copy` buffer size to match `std::io::copy` ([#4209])
+
+### Documented
+
+- io: rename buffer to file in doc-test ([#4230])
+- sync: fix Notify example ([#4212])
+
+[#4211]: https://github.com/tokio-rs/tokio/pull/4211
+[#4226]: https://github.com/tokio-rs/tokio/pull/4226
+[#3689]: https://github.com/tokio-rs/tokio/pull/3689
+[#4213]: https://github.com/tokio-rs/tokio/pull/4213
+[#4179]: https://github.com/tokio-rs/tokio/pull/4179
+[#4223]: https://github.com/tokio-rs/tokio/pull/4223
+[#4209]: https://github.com/tokio-rs/tokio/pull/4209
+[#4230]: https://github.com/tokio-rs/tokio/pull/4230
+[#4212]: https://github.com/tokio-rs/tokio/pull/4212
+
+# 1.13.1 (November 15, 2021)
+
+### Fixed
+
+- sync: fix a data race between `oneshot::Sender::send` and awaiting a
+ `oneshot::Receiver` when the oneshot has been closed ([#4226])
+
+[#4226]: https://github.com/tokio-rs/tokio/pull/4226
+
+# 1.13.0 (October 29, 2021)
+
+### Fixed
+
+- sync: fix `Notify` to clone the waker before locking its waiter list ([#4129])
+- tokio: add riscv32 to non atomic64 architectures ([#4185])
+
+### Added
+
+- net: add `poll_{recv,send}_ready` methods to `udp` and `uds_datagram` ([#4131])
+- net: add `try_*`, `readable`, `writable`, `ready`, and `peer_addr` methods to split halves ([#4120])
+- sync: add `blocking_lock` to `Mutex` ([#4130])
+- sync: add `watch::Sender::send_replace` ([#3962], [#4195])
+- sync: expand `Debug` for `Mutex<T>` impl to unsized `T` ([#4134])
+- tracing: instrument time::Sleep ([#4072])
+- tracing: use structured location fields for spawned tasks ([#4128])
+
+### Changed
+
+- io: add assert in `copy_bidirectional` that `poll_write` is sensible ([#4125])
+- macros: use qualified syntax when polling in `select!` ([#4192])
+- runtime: handle `block_on` wakeups better ([#4157])
+- task: allocate callback on heap immediately in debug mode ([#4203])
+- tokio: assert platform-minimum requirements at build time ([#3797])
+
+### Documented
+
+- docs: conversion of doc comments to indicative mood ([#4174])
+- docs: add returning on the first error example for `try_join!` ([#4133])
+- docs: fixing broken links in `tokio/src/lib.rs` ([#4132])
+- signal: add example with background listener ([#4171])
+- sync: add more oneshot examples ([#4153])
+- time: document `Interval::tick` cancel safety ([#4152])
+
+[#3797]: https://github.com/tokio-rs/tokio/pull/3797
+[#3962]: https://github.com/tokio-rs/tokio/pull/3962
+[#4072]: https://github.com/tokio-rs/tokio/pull/4072
+[#4120]: https://github.com/tokio-rs/tokio/pull/4120
+[#4125]: https://github.com/tokio-rs/tokio/pull/4125
+[#4128]: https://github.com/tokio-rs/tokio/pull/4128
+[#4129]: https://github.com/tokio-rs/tokio/pull/4129
+[#4130]: https://github.com/tokio-rs/tokio/pull/4130
+[#4131]: https://github.com/tokio-rs/tokio/pull/4131
+[#4132]: https://github.com/tokio-rs/tokio/pull/4132
+[#4133]: https://github.com/tokio-rs/tokio/pull/4133
+[#4134]: https://github.com/tokio-rs/tokio/pull/4134
+[#4152]: https://github.com/tokio-rs/tokio/pull/4152
+[#4153]: https://github.com/tokio-rs/tokio/pull/4153
+[#4157]: https://github.com/tokio-rs/tokio/pull/4157
+[#4171]: https://github.com/tokio-rs/tokio/pull/4171
+[#4174]: https://github.com/tokio-rs/tokio/pull/4174
+[#4185]: https://github.com/tokio-rs/tokio/pull/4185
+[#4192]: https://github.com/tokio-rs/tokio/pull/4192
+[#4195]: https://github.com/tokio-rs/tokio/pull/4195
+[#4203]: https://github.com/tokio-rs/tokio/pull/4203
+
# 1.12.0 (September 21, 2021)
### Fixed
@@ -14,10 +113,6 @@
- runtime: callback when a worker parks and unparks ([#4070])
- sync: implement `try_recv` for mpsc channels ([#4113])
-### Changed
-
-- macros: run runtime inside `LocalSet` when using macro ([#4027])
-
### Documented
- docs: clarify CPU-bound tasks on Tokio ([#4105])
diff --git a/Cargo.toml b/Cargo.toml
index c09b22a..ec9b335 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,21 +3,20 @@
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
#
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
[package]
edition = "2018"
name = "tokio"
-version = "1.12.0"
+version = "1.14.0"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio/1.12.0/tokio/"
+documentation = "https://docs.rs/tokio/1.14.0/tokio/"
readme = "README.md"
keywords = ["io", "async", "non-blocking", "futures"]
categories = ["asynchronous", "network-programming"]
@@ -57,7 +56,7 @@ optional = true
version = "0.2.0"
[dependencies.tokio-macros]
-version = "1.1.0"
+version = "1.6.0"
optional = true
[dev-dependencies.async-stream]
version = "0.3"
@@ -112,7 +111,7 @@ features = ["futures", "checkpoint"]
version = "0.6.0"
features = ["tokio"]
[target."cfg(tokio_unstable)".dependencies.tracing]
-version = "0.1.21"
+version = "0.1.25"
features = ["std"]
optional = true
default-features = false
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index d2e4696..348ec46 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -7,12 +7,12 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.0.x" git tag.
-version = "1.12.0"
+version = "1.14.0"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
readme = "README.md"
-documentation = "https://docs.rs/tokio/1.12.0/tokio/"
+documentation = "https://docs.rs/tokio/1.14.0/tokio/"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
description = """
@@ -87,7 +87,7 @@ test-util = ["rt", "sync", "time"]
time = []
[dependencies]
-tokio-macros = { version = "1.1.0", path = "../tokio-macros", optional = true }
+tokio-macros = { version = "1.6.0", path = "../tokio-macros", optional = true }
pin-project-lite = "0.2.0"
@@ -102,7 +102,7 @@ parking_lot = { version = "0.11.0", optional = true }
# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
[target.'cfg(tokio_unstable)'.dependencies]
-tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full
+tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } # Not in full
[target.'cfg(unix)'.dependencies]
libc = { version = "0.2.42", optional = true }
diff --git a/METADATA b/METADATA
index cd6df63..1d167d1 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio/tokio-1.12.0.crate"
+ value: "https://static.crates.io/crates/tokio/tokio-1.14.0.crate"
}
- version: "1.12.0"
+ version: "1.14.0"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 9
- day: 30
+ month: 11
+ day: 16
}
}
diff --git a/README.md b/README.md
index 4d99c88..19f049c 100644
--- a/README.md
+++ b/README.md
@@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:
```toml
[dependencies]
-tokio = { version = "1.12.0", features = ["full"] }
+tokio = { version = "1.14.0", features = ["full"] }
```
Then, on your main.rs:
diff --git a/patches/test_fix.patch b/patches/test_fix.patch
deleted file mode 100644
index efc8c27..0000000
--- a/patches/test_fix.patch
+++ /dev/null
@@ -1,21 +0,0 @@
-diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs
-index a70f49b81a..f8a35d0ede 100644
---- a/tests/task_local_set.rs
-+++ b/tests/task_local_set.rs
-@@ -16,16 +16,6 @@ use std::sync::atomic::Ordering::{self, SeqCst};
- use std::sync::atomic::{AtomicBool, AtomicUsize};
- use std::time::Duration;
-
--#[tokio::test(flavor = "current_thread")]
--async fn localset_implicit_current_thread() {
-- task::spawn_local(async {}).await.unwrap();
--}
--
--#[tokio::test(flavor = "multi_thread")]
--async fn localset_implicit_multi_thread() {
-- task::spawn_local(async {}).await.unwrap();
--}
--
- #[tokio::test(flavor = "current_thread")]
- async fn local_basic_scheduler() {
- LocalSet::new() \ No newline at end of file
diff --git a/src/coop.rs b/src/coop.rs
index 16d93fb..256e962 100644
--- a/src/coop.rs
+++ b/src/coop.rs
@@ -69,14 +69,14 @@ cfg_rt_multi_thread! {
}
}
-/// Run the given closure with a cooperative task budget. When the function
+/// Runs the given closure with a cooperative task budget. When the function
/// returns, the budget is reset to the value prior to calling the function.
#[inline(always)]
pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::initial(), f)
}
-/// Run the given closure with an unconstrained task budget. When the function returns, the budget
+/// Runs the given closure with an unconstrained task budget. When the function returns, the budget
/// is reset to the value prior to calling the function.
#[inline(always)]
pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
@@ -108,7 +108,7 @@ fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
}
cfg_rt_multi_thread! {
- /// Set the current task's budget
+ /// Sets the current task's budget.
pub(crate) fn set(budget: Budget) {
CURRENT.with(|cell| cell.set(budget))
}
@@ -120,7 +120,7 @@ cfg_rt_multi_thread! {
}
cfg_rt! {
- /// Forcibly remove the budgeting constraints early.
+ /// Forcibly removes the budgeting constraints early.
///
/// Returns the remaining budget
pub(crate) fn stop() -> Budget {
@@ -186,7 +186,7 @@ cfg_coop! {
}
impl Budget {
- /// Decrement the budget. Returns `true` if successful. Decrementing fails
+ /// Decrements the budget. Returns `true` if successful. Decrementing fails
/// when there is not enough remaining budget.
fn decrement(&mut self) -> bool {
if let Some(num) = &mut self.0 {
diff --git a/src/fs/create_dir.rs b/src/fs/create_dir.rs
index e03b04d..4119695 100644
--- a/src/fs/create_dir.rs
+++ b/src/fs/create_dir.rs
@@ -3,7 +3,7 @@ use crate::fs::asyncify;
use std::io;
use std::path::Path;
-/// Creates a new, empty directory at the provided path
+/// Creates a new, empty directory at the provided path.
///
/// This is an async version of [`std::fs::create_dir`][std]
///
diff --git a/src/fs/dir_builder.rs b/src/fs/dir_builder.rs
index b184934..97168bf 100644
--- a/src/fs/dir_builder.rs
+++ b/src/fs/dir_builder.rs
@@ -14,7 +14,7 @@ pub struct DirBuilder {
/// Indicates whether to create parent directories if they are missing.
recursive: bool,
- /// Set the Unix mode for newly created directories.
+ /// Sets the Unix mode for newly created directories.
#[cfg(unix)]
pub(super) mode: Option<u32>,
}
diff --git a/src/fs/file.rs b/src/fs/file.rs
index 5286e6c..61071cf 100644
--- a/src/fs/file.rs
+++ b/src/fs/file.rs
@@ -74,7 +74,7 @@ use std::fs::File as StdFile;
/// # }
/// ```
///
-/// Read the contents of a file into a buffer
+/// Read the contents of a file into a buffer:
///
/// ```no_run
/// use tokio::fs::File;
@@ -383,7 +383,7 @@ impl File {
asyncify(move || std.metadata()).await
}
- /// Create a new `File` instance that shares the same underlying file handle
+ /// Creates a new `File` instance that shares the same underlying file handle
/// as the existing `File` instance. Reads, writes, and seeks will affect both
/// File instances simultaneously.
///
diff --git a/src/fs/open_options.rs b/src/fs/open_options.rs
index 3e73529..f3b4654 100644
--- a/src/fs/open_options.rs
+++ b/src/fs/open_options.rs
@@ -430,7 +430,7 @@ feature! {
self
}
- /// Pass custom flags to the `flags` argument of `open`.
+ /// Passes custom flags to the `flags` argument of `open`.
///
/// The bits that define the access mode are masked out with `O_ACCMODE`, to
/// ensure they do not interfere with the access mode set by Rusts options.
diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs
index 514d59c..281ea4c 100644
--- a/src/fs/read_dir.rs
+++ b/src/fs/read_dir.rs
@@ -34,7 +34,7 @@ pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
Ok(ReadDir(State::Idle(Some(std))))
}
-/// Read the the entries in a directory.
+/// Reads the the entries in a directory.
///
/// This struct is returned from the [`read_dir`] function of this module and
/// will yield instances of [`DirEntry`]. Through a [`DirEntry`] information
@@ -287,7 +287,7 @@ impl DirEntry {
asyncify(move || std.file_type()).await
}
- /// Returns a reference to the underlying `std::fs::DirEntry`
+ /// Returns a reference to the underlying `std::fs::DirEntry`.
#[cfg(unix)]
pub(super) fn as_inner(&self) -> &std::fs::DirEntry {
&self.0
diff --git a/src/future/maybe_done.rs b/src/future/maybe_done.rs
index 1e083ad..486efbe 100644
--- a/src/future/maybe_done.rs
+++ b/src/future/maybe_done.rs
@@ -1,4 +1,4 @@
-//! Definition of the MaybeDone combinator
+//! Definition of the MaybeDone combinator.
use std::future::Future;
use std::mem;
@@ -8,9 +8,9 @@ use std::task::{Context, Poll};
/// A future that may have completed.
#[derive(Debug)]
pub enum MaybeDone<Fut: Future> {
- /// A not-yet-completed future
+ /// A not-yet-completed future.
Future(Fut),
- /// The output of the completed future
+ /// The output of the completed future.
Done(Fut::Output),
/// The empty variant after the result of a [`MaybeDone`] has been
/// taken using the [`take_output`](MaybeDone::take_output) method.
@@ -20,7 +20,7 @@ pub enum MaybeDone<Fut: Future> {
// Safe because we never generate `Pin<&mut Fut::Output>`
impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {}
-/// Wraps a future into a `MaybeDone`
+/// Wraps a future into a `MaybeDone`.
pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
MaybeDone::Future(future)
}
diff --git a/src/future/poll_fn.rs b/src/future/poll_fn.rs
index 0169bd5..d82ce89 100644
--- a/src/future/poll_fn.rs
+++ b/src/future/poll_fn.rs
@@ -1,6 +1,6 @@
#![allow(dead_code)]
-//! Definition of the `PollFn` adapter combinator
+//! Definition of the `PollFn` adapter combinator.
use std::fmt;
use std::future::Future;
diff --git a/src/io/async_fd.rs b/src/io/async_fd.rs
index fa5bec5..9ec5b7f 100644
--- a/src/io/async_fd.rs
+++ b/src/io/async_fd.rs
@@ -205,13 +205,13 @@ impl<T: AsRawFd> AsyncFd<T> {
})
}
- /// Returns a shared reference to the backing object of this [`AsyncFd`]
+ /// Returns a shared reference to the backing object of this [`AsyncFd`].
#[inline]
pub fn get_ref(&self) -> &T {
self.inner.as_ref().unwrap()
}
- /// Returns a mutable reference to the backing object of this [`AsyncFd`]
+ /// Returns a mutable reference to the backing object of this [`AsyncFd`].
#[inline]
pub fn get_mut(&mut self) -> &mut T {
self.inner.as_mut().unwrap()
diff --git a/src/io/blocking.rs b/src/io/blocking.rs
index 94a3484..1d79ee7 100644
--- a/src/io/blocking.rs
+++ b/src/io/blocking.rs
@@ -16,7 +16,7 @@ use self::State::*;
pub(crate) struct Blocking<T> {
inner: Option<T>,
state: State<T>,
- /// `true` if the lower IO layer needs flushing
+ /// `true` if the lower IO layer needs flushing.
need_flush: bool,
}
@@ -175,7 +175,7 @@ where
}
}
-/// Repeats operations that are interrupted
+/// Repeats operations that are interrupted.
macro_rules! uninterruptibly {
($e:expr) => {{
loop {
diff --git a/src/io/bsd/poll_aio.rs b/src/io/bsd/poll_aio.rs
index a765d76..f1ac4b2 100644
--- a/src/io/bsd/poll_aio.rs
+++ b/src/io/bsd/poll_aio.rs
@@ -1,4 +1,4 @@
-//! Use POSIX AIO futures with Tokio
+//! Use POSIX AIO futures with Tokio.
use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
use mio::event::Source;
@@ -16,14 +16,14 @@ use std::task::{Context, Poll};
/// Tokio's consumer must pass an implementor of this trait to create a
/// [`Aio`] object.
pub trait AioSource {
- /// Register this AIO event source with Tokio's reactor
+ /// Registers this AIO event source with Tokio's reactor.
fn register(&mut self, kq: RawFd, token: usize);
- /// Deregister this AIO event source with Tokio's reactor
+ /// Deregisters this AIO event source with Tokio's reactor.
fn deregister(&mut self);
}
-/// Wrap the user's AioSource in order to implement mio::event::Source, which
+/// Wraps the user's AioSource in order to implement mio::event::Source, which
/// is what the rest of the crate wants.
struct MioSource<T>(T);
diff --git a/src/io/driver/interest.rs b/src/io/driver/interest.rs
index c5b18ed..d6b46df 100644
--- a/src/io/driver/interest.rs
+++ b/src/io/driver/interest.rs
@@ -5,7 +5,7 @@ use crate::io::driver::Ready;
use std::fmt;
use std::ops;
-/// Readiness event interest
+/// Readiness event interest.
///
/// Specifies the readiness events the caller is interested in when awaiting on
/// I/O resource readiness states.
@@ -17,19 +17,19 @@ impl Interest {
// The non-FreeBSD definitions in this block are active only when
// building documentation.
cfg_aio! {
- /// Interest for POSIX AIO
+ /// Interest for POSIX AIO.
#[cfg(target_os = "freebsd")]
pub const AIO: Interest = Interest(mio::Interest::AIO);
- /// Interest for POSIX AIO
+ /// Interest for POSIX AIO.
#[cfg(not(target_os = "freebsd"))]
pub const AIO: Interest = Interest(mio::Interest::READABLE);
- /// Interest for POSIX AIO lio_listio events
+ /// Interest for POSIX AIO lio_listio events.
#[cfg(target_os = "freebsd")]
pub const LIO: Interest = Interest(mio::Interest::LIO);
- /// Interest for POSIX AIO lio_listio events
+ /// Interest for POSIX AIO lio_listio events.
#[cfg(not(target_os = "freebsd"))]
pub const LIO: Interest = Interest(mio::Interest::READABLE);
}
@@ -39,7 +39,7 @@ impl Interest {
/// Readable interest includes read-closed events.
pub const READABLE: Interest = Interest(mio::Interest::READABLE);
- /// Interest in all writable events
+ /// Interest in all writable events.
///
/// Writable interest includes write-closed events.
pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs
index 1511884..19f67a2 100644
--- a/src/io/driver/mod.rs
+++ b/src/io/driver/mod.rs
@@ -23,10 +23,10 @@ use std::io;
use std::sync::{Arc, Weak};
use std::time::Duration;
-/// I/O driver, backed by Mio
+/// I/O driver, backed by Mio.
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
- /// as it is mostly used to determine when to call `compact()`
+ /// as it is mostly used to determine when to call `compact()`.
tick: u8,
/// Reuse the `mio::Events` value across calls to poll.
@@ -35,17 +35,17 @@ pub(crate) struct Driver {
/// Primary slab handle containing the state for each resource registered
/// with this driver. During Drop this is moved into the Inner structure, so
/// this is an Option to allow it to be vacated (until Drop this is always
- /// Some)
+ /// Some).
resources: Option<Slab<ScheduledIo>>,
- /// The system event queue
+ /// The system event queue.
poll: mio::Poll,
/// State shared between the reactor and the handles.
inner: Arc<Inner>,
}
-/// A reference to an I/O driver
+/// A reference to an I/O driver.
#[derive(Clone)]
pub(crate) struct Handle {
inner: Weak<Inner>,
@@ -66,13 +66,13 @@ pub(super) struct Inner {
/// without risking new ones being registered in the meantime.
resources: Mutex<Option<Slab<ScheduledIo>>>,
- /// Registers I/O resources
+ /// Registers I/O resources.
registry: mio::Registry,
/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
- /// Used to wake up the reactor from a call to `turn`
+ /// Used to wake up the reactor from a call to `turn`.
waker: mio::Waker,
}
@@ -253,7 +253,7 @@ impl fmt::Debug for Driver {
cfg_rt! {
impl Handle {
- /// Returns a handle to the current reactor
+ /// Returns a handle to the current reactor.
///
/// # Panics
///
@@ -267,7 +267,7 @@ cfg_rt! {
cfg_not_rt! {
impl Handle {
- /// Returns a handle to the current reactor
+ /// Returns a handle to the current reactor.
///
/// # Panics
///
diff --git a/src/io/driver/ready.rs b/src/io/driver/ready.rs
index 305dc91..2430d30 100644
--- a/src/io/driver/ready.rs
+++ b/src/io/driver/ready.rs
@@ -68,7 +68,7 @@ impl Ready {
ready
}
- /// Returns true if `Ready` is the empty set
+ /// Returns true if `Ready` is the empty set.
///
/// # Examples
///
@@ -82,7 +82,7 @@ impl Ready {
self == Ready::EMPTY
}
- /// Returns `true` if the value includes `readable`
+ /// Returns `true` if the value includes `readable`.
///
/// # Examples
///
@@ -98,7 +98,7 @@ impl Ready {
self.contains(Ready::READABLE) || self.is_read_closed()
}
- /// Returns `true` if the value includes writable `readiness`
+ /// Returns `true` if the value includes writable `readiness`.
///
/// # Examples
///
@@ -114,7 +114,7 @@ impl Ready {
self.contains(Ready::WRITABLE) || self.is_write_closed()
}
- /// Returns `true` if the value includes read-closed `readiness`
+ /// Returns `true` if the value includes read-closed `readiness`.
///
/// # Examples
///
@@ -129,7 +129,7 @@ impl Ready {
self.contains(Ready::READ_CLOSED)
}
- /// Returns `true` if the value includes write-closed `readiness`
+ /// Returns `true` if the value includes write-closed `readiness`.
///
/// # Examples
///
@@ -154,7 +154,7 @@ impl Ready {
(self & other) == other
}
- /// Create a `Ready` instance using the given `usize` representation.
+ /// Creates a `Ready` instance using the given `usize` representation.
///
/// The `usize` representation must have been obtained from a call to
/// `Readiness::as_usize`.
diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs
index a265720..76f9343 100644
--- a/src/io/driver/scheduled_io.rs
+++ b/src/io/driver/scheduled_io.rs
@@ -36,16 +36,16 @@ cfg_io_readiness! {
#[derive(Debug, Default)]
struct Waiters {
#[cfg(feature = "net")]
- /// List of all current waiters
+ /// List of all current waiters.
list: WaitList,
- /// Waker used for AsyncRead
+ /// Waker used for AsyncRead.
reader: Option<Waker>,
- /// Waker used for AsyncWrite
+ /// Waker used for AsyncWrite.
writer: Option<Waker>,
- /// True if this ScheduledIo has been killed due to IO driver shutdown
+ /// True if this ScheduledIo has been killed due to IO driver shutdown.
is_shutdown: bool,
}
@@ -54,19 +54,19 @@ cfg_io_readiness! {
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
- /// The waker for this task
+ /// The waker for this task.
waker: Option<Waker>,
- /// The interest this waiter is waiting on
+ /// The interest this waiter is waiting on.
interest: Interest,
is_ready: bool,
- /// Should never be `!Unpin`
+ /// Should never be `!Unpin`.
_p: PhantomPinned,
}
- /// Future returned by `readiness()`
+ /// Future returned by `readiness()`.
struct Readiness<'a> {
scheduled_io: &'a ScheduledIo,
@@ -276,7 +276,7 @@ impl ScheduledIo {
}
}
- /// Poll version of checking readiness for a certain direction.
+ /// Polls for readiness events in a given direction.
///
/// These are to support `AsyncRead` and `AsyncWrite` polling methods,
/// which cannot use the `async fn` version. This uses reserved reader
@@ -363,7 +363,7 @@ unsafe impl Sync for ScheduledIo {}
cfg_io_readiness! {
impl ScheduledIo {
- /// An async version of `poll_readiness` which uses a linked list of wakers
+ /// An async version of `poll_readiness` which uses a linked list of wakers.
pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
self.readiness_fut(interest).await
}
diff --git a/src/io/mod.rs b/src/io/mod.rs
index a5ee108..cfdda61 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -218,7 +218,7 @@ cfg_io_driver_impl! {
}
cfg_aio! {
- /// BSD-specific I/O types
+ /// BSD-specific I/O types.
pub mod bsd {
mod poll_aio;
diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs
index 9872574..44e68a2 100644
--- a/src/io/poll_evented.rs
+++ b/src/io/poll_evented.rs
@@ -113,7 +113,7 @@ impl<E: Source> PollEvented<E> {
})
}
- /// Returns a reference to the registration
+ /// Returns a reference to the registration.
#[cfg(any(
feature = "net",
all(unix, feature = "process"),
@@ -123,7 +123,7 @@ impl<E: Source> PollEvented<E> {
&self.registration
}
- /// Deregister the inner io from the registration and returns a Result containing the inner io
+ /// Deregisters the inner io from the registration and returns a Result containing the inner io.
#[cfg(any(feature = "net", feature = "process"))]
pub(crate) fn into_inner(mut self) -> io::Result<E> {
let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
diff --git a/src/io/split.rs b/src/io/split.rs
index f35273f..8258a0f 100644
--- a/src/io/split.rs
+++ b/src/io/split.rs
@@ -90,7 +90,7 @@ impl<T> ReadHalf<T> {
}
impl<T> WriteHalf<T> {
- /// Check if this `WriteHalf` and some `ReadHalf` were split from the same
+ /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same
/// stream.
pub fn is_pair_of(&self, other: &ReadHalf<T>) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
diff --git a/src/io/stdio_common.rs b/src/io/stdio_common.rs
index 56c4520..7e4a198 100644
--- a/src/io/stdio_common.rs
+++ b/src/io/stdio_common.rs
@@ -7,7 +7,7 @@ use std::task::{Context, Poll};
/// if buffer contents seems to be utf8. Otherwise it only trims buffer down to MAX_BUF.
/// That's why, wrapped writer will always receive well-formed utf-8 bytes.
/// # Other platforms
-/// passes data to `inner` as is
+/// Passes data to `inner` as is.
#[derive(Debug)]
pub(crate) struct SplitByUtf8BoundaryIfWindows<W> {
inner: W,
diff --git a/src/io/util/async_write_ext.rs b/src/io/util/async_write_ext.rs
index a1f77f8..93a3183 100644
--- a/src/io/util/async_write_ext.rs
+++ b/src/io/util/async_write_ext.rs
@@ -20,7 +20,7 @@ use std::io::IoSlice;
use bytes::Buf;
cfg_io_util! {
- /// Defines numeric writer
+ /// Defines numeric writer.
macro_rules! write_impl {
(
$(
@@ -256,7 +256,7 @@ cfg_io_util! {
write_buf(self, src)
}
- /// Attempts to write an entire buffer into this writer
+ /// Attempts to write an entire buffer into this writer.
///
/// Equivalent to:
///
@@ -353,9 +353,9 @@ cfg_io_util! {
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
- /// let mut buffer = File::create("foo.txt").await?;
+ /// let mut file = File::create("foo.txt").await?;
///
- /// buffer.write_all(b"some bytes").await?;
+ /// file.write_all(b"some bytes").await?;
/// Ok(())
/// }
/// ```
diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs
index 7cfd46c..7df610b 100644
--- a/src/io/util/buf_reader.rs
+++ b/src/io/util/buf_reader.rs
@@ -155,7 +155,7 @@ pub(super) enum SeekState {
Pending,
}
-/// Seek to an offset, in bytes, in the underlying reader.
+/// Seeks to an offset, in bytes, in the underlying reader.
///
/// The position used for seeking with `SeekFrom::Current(_)` is the
/// position the underlying reader would be at if the `BufReader` had no
diff --git a/src/io/util/copy.rs b/src/io/util/copy.rs
index fbd77b5..d0ab7cb 100644
--- a/src/io/util/copy.rs
+++ b/src/io/util/copy.rs
@@ -23,7 +23,7 @@ impl CopyBuffer {
pos: 0,
cap: 0,
amt: 0,
- buf: vec![0; 2048].into_boxed_slice(),
+ buf: vec![0; super::DEFAULT_BUF_SIZE].into_boxed_slice(),
}
}
@@ -84,6 +84,14 @@ impl CopyBuffer {
}
}
+ // If pos larger than cap, this loop will never stop.
+ // In particular, user's wrong poll_write implementation returning
+ // incorrect written length may lead to thread blocking.
+ debug_assert!(
+ self.pos <= self.cap,
+ "writer returned length larger than input slice"
+ );
+
// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
if self.pos == self.cap && self.read_done {
diff --git a/src/io/util/lines.rs b/src/io/util/lines.rs
index 3fbf5e3..717f633 100644
--- a/src/io/util/lines.rs
+++ b/src/io/util/lines.rs
@@ -8,7 +8,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
- /// Read lines from an [`AsyncBufRead`].
+ /// Reads lines from an [`AsyncBufRead`].
///
/// A `Lines` can be turned into a `Stream` with [`LinesStream`].
///
@@ -72,12 +72,12 @@ where
poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
}
- /// Obtain a mutable reference to the underlying reader
+ /// Obtains a mutable reference to the underlying reader.
pub fn get_mut(&mut self) -> &mut R {
&mut self.reader
}
- /// Obtain a reference to the underlying reader
+ /// Obtains a reference to the underlying reader.
pub fn get_ref(&mut self) -> &R {
&self.reader
}
diff --git a/src/lib.rs b/src/lib.rs
index 9689223..9821c1a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -16,6 +16,10 @@
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
#![cfg_attr(docsrs, feature(doc_cfg))]
+#![cfg_attr(docsrs, feature(doc_cfg_hide))]
+#![cfg_attr(docsrs, doc(cfg_hide(docsrs)))]
+#![cfg_attr(docsrs, doc(cfg_hide(loom)))]
+#![cfg_attr(docsrs, doc(cfg_hide(not(loom))))]
#![cfg_attr(docsrs, allow(unused_attributes))]
//! A runtime for writing reliable network applications without compromising speed.
@@ -161,8 +165,8 @@
//! [`tokio::runtime`]: crate::runtime
//! [`Builder`]: crate::runtime::Builder
//! [`Runtime`]: crate::runtime::Runtime
-//! [rt]: runtime/index.html#basic-scheduler
-//! [rt-multi-thread]: runtime/index.html#threaded-scheduler
+//! [rt]: runtime/index.html#current-thread-scheduler
+//! [rt-multi-thread]: runtime/index.html#multi-thread-scheduler
//! [rt-features]: runtime/index.html#runtime-scheduler
//!
//! ## CPU-bound tasks and blocking code
@@ -350,6 +354,19 @@
//!
//! [feature flags]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-section
+// Test that pointer width is compatible. This asserts that e.g. usize is at
+// least 32 bits, which a lot of components in Tokio currently assumes.
+//
+// TODO: improve once we have MSRV access to const eval to make more flexible.
+#[cfg(not(any(
+ target_pointer_width = "32",
+ target_pointer_width = "64",
+ target_pointer_width = "128"
+)))]
+compile_error! {
+ "Tokio requires the platform pointer width to be 32, 64, or 128 bits"
+}
+
// Includes re-exports used by macros.
//
// This module is not intended to be part of the public API. In general, any
@@ -480,6 +497,12 @@ cfg_macros! {
#[doc(hidden)]
pub use tokio_macros::select_priv_declare_output_enum;
+ /// Implementation detail of the `select!` macro. This macro is **not**
+ /// intended to be used as part of the public API and is permitted to
+ /// change.
+ #[doc(hidden)]
+ pub use tokio_macros::select_priv_clean_pattern;
+
cfg_rt! {
#[cfg(feature = "rt-multi-thread")]
#[cfg(not(test))] // Work around for rust-lang/rust#62127
diff --git a/src/loom/std/mutex.rs b/src/loom/std/mutex.rs
index bf14d62..3f686e0 100644
--- a/src/loom/std/mutex.rs
+++ b/src/loom/std/mutex.rs
@@ -1,7 +1,7 @@
use std::sync::{self, MutexGuard, TryLockError};
/// Adapter for `std::Mutex` that removes the poisoning aspects
-// from its api
+/// from its api.
#[derive(Debug)]
pub(crate) struct Mutex<T: ?Sized>(sync::Mutex<T>);
diff --git a/src/macros/cfg.rs b/src/macros/cfg.rs
index 193bcd7..606bce7 100644
--- a/src/macros/cfg.rs
+++ b/src/macros/cfg.rs
@@ -13,7 +13,7 @@ macro_rules! feature {
}
}
-/// Enables enter::block_on
+/// Enables enter::block_on.
macro_rules! cfg_block_on {
($($item:item)*) => {
$(
@@ -28,7 +28,7 @@ macro_rules! cfg_block_on {
}
}
-/// Enables internal `AtomicWaker` impl
+/// Enables internal `AtomicWaker` impl.
macro_rules! cfg_atomic_waker_impl {
($($item:item)*) => {
$(
@@ -99,6 +99,7 @@ macro_rules! cfg_io_driver_impl {
feature = "process",
all(unix, feature = "signal"),
))]
+ #[cfg_attr(docsrs, doc(cfg(all())))]
$item
)*
}
@@ -422,7 +423,8 @@ macro_rules! cfg_has_atomic_u64 {
#[cfg(not(any(
target_arch = "arm",
target_arch = "mips",
- target_arch = "powerpc"
+ target_arch = "powerpc",
+ target_arch = "riscv32"
)))]
$item
)*
@@ -435,7 +437,8 @@ macro_rules! cfg_not_has_atomic_u64 {
#[cfg(any(
target_arch = "arm",
target_arch = "mips",
- target_arch = "powerpc"
+ target_arch = "powerpc",
+ target_arch = "riscv32"
))]
$item
)*
diff --git a/src/macros/join.rs b/src/macros/join.rs
index 5f37af5..f91b5f1 100644
--- a/src/macros/join.rs
+++ b/src/macros/join.rs
@@ -1,4 +1,4 @@
-/// Wait on multiple concurrent branches, returning when **all** branches
+/// Waits on multiple concurrent branches, returning when **all** branches
/// complete.
///
/// The `join!` macro must be used inside of async functions, closures, and
diff --git a/src/macros/mod.rs b/src/macros/mod.rs
index b0af521..a1839c8 100644
--- a/src/macros/mod.rs
+++ b/src/macros/mod.rs
@@ -15,6 +15,11 @@ mod ready;
#[macro_use]
mod thread_local;
+cfg_trace! {
+ #[macro_use]
+ mod trace;
+}
+
#[macro_use]
#[cfg(feature = "rt")]
pub(crate) mod scoped_tls;
diff --git a/src/macros/scoped_tls.rs b/src/macros/scoped_tls.rs
index a00aae2..f2504cb 100644
--- a/src/macros/scoped_tls.rs
+++ b/src/macros/scoped_tls.rs
@@ -3,7 +3,7 @@ use crate::loom::thread::LocalKey;
use std::cell::Cell;
use std::marker;
-/// Set a reference as a thread-local
+/// Sets a reference as a thread-local.
macro_rules! scoped_thread_local {
($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => (
$(#[$attrs])*
diff --git a/src/macros/select.rs b/src/macros/select.rs
index a90ee9e..051f8cb 100644
--- a/src/macros/select.rs
+++ b/src/macros/select.rs
@@ -1,4 +1,4 @@
-/// Wait on multiple concurrent branches, returning when the **first** branch
+/// Waits on multiple concurrent branches, returning when the **first** branch
/// completes, cancelling the remaining branches.
///
/// The `select!` macro must be used inside of async functions, closures, and
@@ -502,7 +502,7 @@ macro_rules! select {
let mut fut = unsafe { Pin::new_unchecked(fut) };
// Try polling it
- let out = match fut.poll(cx) {
+ let out = match Future::poll(fut, cx) {
Ready(out) => out,
Pending => {
// Track that at least one future is
@@ -520,7 +520,7 @@ macro_rules! select {
#[allow(unused_variables)]
#[allow(unused_mut)]
match &out {
- $bind => {}
+ $crate::select_priv_clean_pattern!($bind) => {}
_ => continue,
}
diff --git a/src/macros/trace.rs b/src/macros/trace.rs
new file mode 100644
index 0000000..31dde2f
--- /dev/null
+++ b/src/macros/trace.rs
@@ -0,0 +1,27 @@
+cfg_trace! {
+ macro_rules! trace_op {
+ ($name:literal, $readiness:literal, $parent:expr) => {
+ tracing::trace!(
+ target: "runtime::resource::poll_op",
+ parent: $parent,
+ op_name = $name,
+ is_ready = $readiness
+ );
+ }
+ }
+
+ macro_rules! trace_poll_op {
+ ($name:literal, $poll:expr, $parent:expr $(,)*) => {
+ match $poll {
+ std::task::Poll::Ready(t) => {
+ trace_op!($name, true, $parent);
+ std::task::Poll::Ready(t)
+ }
+ std::task::Poll::Pending => {
+ trace_op!($name, false, $parent);
+ return std::task::Poll::Pending;
+ }
+ }
+ };
+ }
+}
diff --git a/src/macros/try_join.rs b/src/macros/try_join.rs
index fa5850e..6d3a893 100644
--- a/src/macros/try_join.rs
+++ b/src/macros/try_join.rs
@@ -1,4 +1,4 @@
-/// Wait on multiple concurrent branches, returning when **all** branches
+/// Waits on multiple concurrent branches, returning when **all** branches
/// complete with `Ok(_)` or on the first `Err(_)`.
///
/// The `try_join!` macro must be used inside of async functions, closures, and
@@ -59,6 +59,45 @@
/// }
/// }
/// ```
+///
+/// Using `try_join!` with spawned tasks.
+///
+/// ```
+/// use tokio::task::JoinHandle;
+///
+/// async fn do_stuff_async() -> Result<(), &'static str> {
+/// // async work
+/// # Err("failed")
+/// }
+///
+/// async fn more_async_work() -> Result<(), &'static str> {
+/// // more here
+/// # Ok(())
+/// }
+///
+/// async fn flatten<T>(handle: JoinHandle<Result<T, &'static str>>) -> Result<T, &'static str> {
+/// match handle.await {
+/// Ok(Ok(result)) => Ok(result),
+/// Ok(Err(err)) => Err(err),
+/// Err(err) => Err("handling failed"),
+/// }
+/// }
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let handle1 = tokio::spawn(do_stuff_async());
+/// let handle2 = tokio::spawn(more_async_work());
+/// match tokio::try_join!(flatten(handle1), flatten(handle2)) {
+/// Ok(val) => {
+/// // do something with the values
+/// }
+/// Err(err) => {
+/// println!("Failed with {}.", err);
+/// # assert_eq!(err, "failed");
+/// }
+/// }
+/// }
+/// ```
#[macro_export]
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
macro_rules! try_join {
diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs
index 86f0ec1..8aecb21 100644
--- a/src/net/tcp/listener.rs
+++ b/src/net/tcp/listener.rs
@@ -227,7 +227,7 @@ impl TcpListener {
Ok(TcpListener { io })
}
- /// Turn a [`tokio::net::TcpListener`] into a [`std::net::TcpListener`].
+ /// Turns a [`tokio::net::TcpListener`] into a [`std::net::TcpListener`].
///
/// The returned [`std::net::TcpListener`] will have nonblocking mode set as
/// `true`. Use [`set_nonblocking`] to change the blocking mode if needed.
diff --git a/src/net/tcp/mod.rs b/src/net/tcp/mod.rs
index 7f0f6d9..cb8a8b2 100644
--- a/src/net/tcp/mod.rs
+++ b/src/net/tcp/mod.rs
@@ -1,4 +1,4 @@
-//! TCP utility types
+//! TCP utility types.
pub(crate) mod listener;
diff --git a/src/net/tcp/socket.rs b/src/net/tcp/socket.rs
index 02cb637..f54ff95 100644
--- a/src/net/tcp/socket.rs
+++ b/src/net/tcp/socket.rs
@@ -87,7 +87,7 @@ cfg_net! {
}
impl TcpSocket {
- /// Create a new socket configured for IPv4.
+ /// Creates a new socket configured for IPv4.
///
/// Calls `socket(2)` with `AF_INET` and `SOCK_STREAM`.
///
@@ -121,7 +121,7 @@ impl TcpSocket {
Ok(TcpSocket { inner })
}
- /// Create a new socket configured for IPv6.
+ /// Creates a new socket configured for IPv6.
///
/// Calls `socket(2)` with `AF_INET6` and `SOCK_STREAM`.
///
@@ -155,7 +155,7 @@ impl TcpSocket {
Ok(TcpSocket { inner })
}
- /// Allow the socket to bind to an in-use address.
+ /// Allows the socket to bind to an in-use address.
///
/// Behavior is platform specific. Refer to the target platform's
/// documentation for more details.
@@ -185,7 +185,7 @@ impl TcpSocket {
self.inner.set_reuseaddr(reuseaddr)
}
- /// Retrieves the value set for `SO_REUSEADDR` on this socket
+ /// Retrieves the value set for `SO_REUSEADDR` on this socket.
///
/// # Examples
///
@@ -211,7 +211,7 @@ impl TcpSocket {
self.inner.get_reuseaddr()
}
- /// Allow the socket to bind to an in-use port. Only available for unix systems
+ /// Allows the socket to bind to an in-use port. Only available for unix systems
/// (excluding Solaris & Illumos).
///
/// Behavior is platform specific. Refer to the target platform's
@@ -245,7 +245,7 @@ impl TcpSocket {
self.inner.set_reuseport(reuseport)
}
- /// Allow the socket to bind to an in-use port. Only available for unix systems
+ /// Allows the socket to bind to an in-use port. Only available for unix systems
/// (excluding Solaris & Illumos).
///
/// Behavior is platform specific. Refer to the target platform's
@@ -348,7 +348,7 @@ impl TcpSocket {
self.inner.get_recv_buffer_size()
}
- /// Get the local address of this socket.
+ /// Gets the local address of this socket.
///
/// Will fail on windows if called before `bind`.
///
@@ -374,7 +374,7 @@ impl TcpSocket {
self.inner.get_localaddr()
}
- /// Bind the socket to the given address.
+ /// Binds the socket to the given address.
///
/// This calls the `bind(2)` operating-system function. Behavior is
/// platform specific. Refer to the target platform's documentation for more
@@ -406,7 +406,7 @@ impl TcpSocket {
self.inner.bind(addr)
}
- /// Establish a TCP connection with a peer at the specified socket address.
+ /// Establishes a TCP connection with a peer at the specified socket address.
///
/// The `TcpSocket` is consumed. Once the connection is established, a
/// connected [`TcpStream`] is returned. If the connection fails, the
@@ -443,7 +443,7 @@ impl TcpSocket {
TcpStream::connect_mio(mio).await
}
- /// Convert the socket into a `TcpListener`.
+ /// Converts the socket into a `TcpListener`.
///
/// `backlog` defines the maximum number of pending connections are queued
/// by the operating system at any given time. Connection are removed from
diff --git a/src/net/tcp/split.rs b/src/net/tcp/split.rs
index 8ae70ce..0e02928 100644
--- a/src/net/tcp/split.rs
+++ b/src/net/tcp/split.rs
@@ -9,14 +9,18 @@
//! level.
use crate::future::poll_fn;
-use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
use crate::net::TcpStream;
use std::io;
-use std::net::Shutdown;
+use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
/// Borrowed read half of a [`TcpStream`], created by [`split`].
///
/// Reading from a `ReadHalf` is usually done using the convenience methods found on the
@@ -49,7 +53,7 @@ pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
}
impl ReadHalf<'_> {
- /// Attempt to receive data on the socket, without removing that data from
+ /// Attempts to receive data on the socket, without removing that data from
/// the queue, registering the current task for wakeup if data is not yet
/// available.
///
@@ -134,6 +138,211 @@ impl ReadHalf<'_> {
let mut buf = ReadBuf::new(buf);
poll_fn(|cx| self.poll_peek(cx, &mut buf)).await
}
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// This function is equivalent to [`TcpStream::ready`].
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.0.ready(interest).await
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_read()`.
+ ///
+ /// This function is also equivalent to [`TcpStream::ready`].
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn readable(&self) -> io::Result<()> {
+ self.0.readable().await
+ }
+
+ /// Tries to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.try_read(buf)
+ }
+
+ /// Tries to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: Self::try_read()
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.0.try_read_vectored(bufs)
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.0.try_read_buf(buf)
+ }
+ }
+
+ /// Returns the remote address that this stream is connected to.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.0.peer_addr()
+ }
+
+ /// Returns the local address that this stream is bound to.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.0.local_addr()
+ }
+}
+
+impl WriteHalf<'_> {
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// This function is equivalent to [`TcpStream::ready`].
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.0.ready(interest).await
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn writable(&self) -> io::Result<()> {
+ self.0.writable().await
+ }
+
+ /// Tries to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.0.try_write(buf)
+ }
+
+ /// Tries to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: Self::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.0.try_write_vectored(bufs)
+ }
+
+ /// Returns the remote address that this stream is connected to.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.0.peer_addr()
+ }
+
+ /// Returns the local address that this stream is bound to.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.0.local_addr()
+ }
}
impl AsyncRead for ReadHalf<'_> {
diff --git a/src/net/tcp/split_owned.rs b/src/net/tcp/split_owned.rs
index 1bcb4f2..ef4e7b5 100644
--- a/src/net/tcp/split_owned.rs
+++ b/src/net/tcp/split_owned.rs
@@ -9,16 +9,20 @@
//! level.
use crate::future::poll_fn;
-use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
use crate::net::TcpStream;
use std::error::Error;
-use std::net::Shutdown;
+use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{fmt, io};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
/// Owned read half of a [`TcpStream`], created by [`into_split`].
///
/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
@@ -189,6 +193,128 @@ impl OwnedReadHalf {
let mut buf = ReadBuf::new(buf);
poll_fn(|cx| self.poll_peek(cx, &mut buf)).await
}
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// This function is equivalent to [`TcpStream::ready`].
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.inner.ready(interest).await
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_read()`.
+ ///
+ /// This function is also equivalent to [`TcpStream::ready`].
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn readable(&self) -> io::Result<()> {
+ self.inner.readable().await
+ }
+
+ /// Tries to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.inner.try_read(buf)
+ }
+
+ /// Tries to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: Self::try_read()
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.inner.try_read_vectored(bufs)
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.inner.try_read_buf(buf)
+ }
+ }
+
+ /// Returns the remote address that this stream is connected to.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
+ /// Returns the local address that this stream is bound to.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
}
impl AsyncRead for OwnedReadHalf {
@@ -211,13 +337,94 @@ impl OwnedWriteHalf {
reunite(other, self)
}
- /// Destroy the write half, but don't close the write half of the stream
+ /// Destroys the write half, but don't close the write half of the stream
/// until the read half is dropped. If the read half has already been
/// dropped, this closes the stream.
pub fn forget(mut self) {
self.shutdown_on_drop = false;
drop(self);
}
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// This function is equivalent to [`TcpStream::ready`].
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.inner.ready(interest).await
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn writable(&self) -> io::Result<()> {
+ self.inner.writable().await
+ }
+
+ /// Tries to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.inner.try_write(buf)
+ }
+
+ /// Tries to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: Self::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.inner.try_write_vectored(bufs)
+ }
+
+ /// Returns the remote address that this stream is connected to.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
+ /// Returns the local address that this stream is bound to.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
}
impl Drop for OwnedWriteHalf {
diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs
index 34ac6ee..60d20fd 100644
--- a/src/net/tcp/stream.rs
+++ b/src/net/tcp/stream.rs
@@ -192,7 +192,7 @@ impl TcpStream {
Ok(TcpStream { io })
}
- /// Turn a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
+ /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
///
/// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
/// Use [`set_nonblocking`] to change the blocking mode if needed.
@@ -350,7 +350,7 @@ impl TcpStream {
}
}
- /// Wait for any of the requested ready states.
+ /// Waits for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same socket on a single
@@ -422,7 +422,7 @@ impl TcpStream {
Ok(event.ready)
}
- /// Wait for the socket to become readable.
+ /// Waits for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_read()`.
@@ -510,7 +510,7 @@ impl TcpStream {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}
- /// Try to read data from the stream into the provided buffer, returning how
+ /// Tries to read data from the stream into the provided buffer, returning how
/// many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
@@ -577,7 +577,7 @@ impl TcpStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
- /// Try to read data from the stream into the provided buffers, returning
+ /// Tries to read data from the stream into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
@@ -656,7 +656,7 @@ impl TcpStream {
}
cfg_io_util! {
- /// Try to read data from the stream into the provided buffer, advancing the
+ /// Tries to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
@@ -734,7 +734,7 @@ impl TcpStream {
}
}
- /// Wait for the socket to become writable.
+ /// Waits for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with `try_write()`.
@@ -874,7 +874,7 @@ impl TcpStream {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
- /// Try to write several buffers to the stream, returning how many bytes
+ /// Tries to write several buffers to the stream, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
@@ -936,7 +936,7 @@ impl TcpStream {
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
}
- /// Try to read or write from the socket using a user-provided IO operation.
+ /// Tries to read or write from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The closure
/// should attempt to perform IO operation from the socket by manually
diff --git a/src/net/udp.rs b/src/net/udp.rs
index 75cc6f3..504d74e 100644
--- a/src/net/udp.rs
+++ b/src/net/udp.rs
@@ -12,7 +12,7 @@ cfg_io_util! {
}
cfg_net! {
- /// A UDP socket
+ /// A UDP socket.
///
/// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket`
/// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`:
@@ -211,7 +211,7 @@ impl UdpSocket {
UdpSocket::new(io)
}
- /// Turn a [`tokio::net::UdpSocket`] into a [`std::net::UdpSocket`].
+ /// Turns a [`tokio::net::UdpSocket`] into a [`std::net::UdpSocket`].
///
/// The returned [`std::net::UdpSocket`] will have nonblocking mode set as
/// `true`. Use [`set_nonblocking`] to change the blocking mode if needed.
@@ -317,7 +317,7 @@ impl UdpSocket {
}))
}
- /// Wait for any of the requested ready states.
+ /// Waits for any of the requested ready states.
///
/// This function is usually paired with `try_recv()` or `try_send()`. It
/// can be used to concurrently recv / send to the same socket on a single
@@ -388,7 +388,7 @@ impl UdpSocket {
Ok(event.ready)
}
- /// Wait for the socket to become writable.
+ /// Waits for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is
/// usually paired with `try_send()` or `try_send_to()`.
@@ -443,6 +443,39 @@ impl UdpSocket {
Ok(())
}
+ /// Polls for write/send readiness.
+ ///
+ /// If the udp stream is not currently ready for sending, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the udp
+ /// stream becomes ready for sending, `Waker::wake` will be called on the
+ /// waker.
+ ///
+ /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
+ /// the `Waker` from the `Context` passed to the most recent call is
+ /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
+ /// second, independent waker.)
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`writable`] is not feasible. Where possible, using [`writable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the udp stream is not ready for writing.
+ /// * `Poll::Ready(Ok(()))` if the udp stream is ready for writing.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`writable`]: method@Self::writable
+ pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_write_ready(cx).map_ok(|_| ())
+ }
+
/// Sends data on the socket to the remote address that the socket is
/// connected to.
///
@@ -516,7 +549,7 @@ impl UdpSocket {
.poll_write_io(cx, || self.io.send(buf))
}
- /// Try to send data on the socket to the remote address to which it is
+ /// Tries to send data on the socket to the remote address to which it is
/// connected.
///
/// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is
@@ -570,7 +603,7 @@ impl UdpSocket {
.try_io(Interest::WRITABLE, || self.io.send(buf))
}
- /// Wait for the socket to become readable.
+ /// Waits for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_recv()`.
@@ -630,6 +663,39 @@ impl UdpSocket {
Ok(())
}
+ /// Polls for read/receive readiness.
+ ///
+ /// If the udp stream is not currently ready for receiving, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the udp
+ /// socket becomes ready for reading, `Waker::wake` will be called on the
+ /// waker.
+ ///
+ /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
+ /// `poll_peek`, only the `Waker` from the `Context` passed to the most
+ /// recent call is scheduled to receive a wakeup. (However,
+ /// `poll_send_ready` retains a second, independent waker.)
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`readable`] is not feasible. Where possible, using [`readable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the udp stream is not ready for reading.
+ /// * `Poll::Ready(Ok(()))` if the udp stream is ready for reading.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`readable`]: method@Self::readable
+ pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_read_ready(cx).map_ok(|_| ())
+ }
+
/// Receives a single datagram message on the socket from the remote address
/// to which it is connected. On success, returns the number of bytes read.
///
@@ -715,7 +781,7 @@ impl UdpSocket {
Poll::Ready(Ok(()))
}
- /// Try to receive a single datagram message on the socket from the remote
+ /// Tries to receive a single datagram message on the socket from the remote
/// address to which it is connected. On success, returns the number of
/// bytes read.
///
@@ -772,7 +838,7 @@ impl UdpSocket {
}
cfg_io_util! {
- /// Try to receive data from the stream into the provided buffer, advancing the
+ /// Tries to receive data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// The function must be called with valid byte array buf of sufficient size
@@ -837,7 +903,7 @@ impl UdpSocket {
})
}
- /// Try to receive a single datagram message on the socket. On success,
+ /// Tries to receive a single datagram message on the socket. On success,
/// returns the number of bytes read and the origin.
///
/// The function must be called with valid byte array buf of sufficient size
@@ -978,7 +1044,7 @@ impl UdpSocket {
.poll_write_io(cx, || self.io.send_to(buf, target))
}
- /// Try to send data on the socket to the given address, but if the send is
+ /// Tries to send data on the socket to the given address, but if the send is
/// blocked this will return right away.
///
/// This function is usually paired with `writable()`.
@@ -1116,7 +1182,7 @@ impl UdpSocket {
Poll::Ready(Ok(addr))
}
- /// Try to receive a single datagram message on the socket. On success,
+ /// Tries to receive a single datagram message on the socket. On success,
/// returns the number of bytes read and the origin.
///
/// The function must be called with valid byte array buf of sufficient size
@@ -1170,7 +1236,7 @@ impl UdpSocket {
.try_io(Interest::READABLE, || self.io.recv_from(buf))
}
- /// Try to read or write from the socket using a user-provided IO operation.
+ /// Tries to read or write from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The closure
/// should attempt to perform IO operation from the socket by manually
diff --git a/src/net/unix/datagram/socket.rs b/src/net/unix/datagram/socket.rs
index 7874b8a..d5b6186 100644
--- a/src/net/unix/datagram/socket.rs
+++ b/src/net/unix/datagram/socket.rs
@@ -96,7 +96,7 @@ cfg_net_unix! {
}
impl UnixDatagram {
- /// Wait for any of the requested ready states.
+ /// Waits for any of the requested ready states.
///
/// This function is usually paired with `try_recv()` or `try_send()`. It
/// can be used to concurrently recv / send to the same socket on a single
@@ -169,7 +169,7 @@ impl UnixDatagram {
Ok(event.ready)
}
- /// Wait for the socket to become writable.
+ /// Waits for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is
/// usually paired with `try_send()` or `try_send_to()`.
@@ -226,7 +226,40 @@ impl UnixDatagram {
Ok(())
}
- /// Wait for the socket to become readable.
+ /// Polls for write/send readiness.
+ ///
+ /// If the socket is not currently ready for sending, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the socket
+ /// becomes ready for sending, `Waker::wake` will be called on the
+ /// waker.
+ ///
+ /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
+ /// the `Waker` from the `Context` passed to the most recent call is
+ /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
+ /// second, independent waker.)
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`writable`] is not feasible. Where possible, using [`writable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready for writing.
+ /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`writable`]: method@Self::writable
+ pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_write_ready(cx).map_ok(|_| ())
+ }
+
+ /// Waits for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_recv()`.
@@ -289,6 +322,39 @@ impl UnixDatagram {
Ok(())
}
+ /// Polls for read/receive readiness.
+ ///
+ /// If the socket is not currently ready for receiving, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the
+ /// socket becomes ready for reading, `Waker::wake` will be called on the
+ /// waker.
+ ///
+ /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
+ /// `poll_peek`, only the `Waker` from the `Context` passed to the most
+ /// recent call is scheduled to receive a wakeup. (However,
+ /// `poll_send_ready` retains a second, independent waker.)
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`readable`] is not feasible. Where possible, using [`readable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready for reading.
+ /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`readable`]: method@Self::readable
+ pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_read_ready(cx).map_ok(|_| ())
+ }
+
/// Creates a new `UnixDatagram` bound to the specified path.
///
/// # Examples
@@ -397,7 +463,7 @@ impl UnixDatagram {
Ok(UnixDatagram { io })
}
- /// Turn a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
+ /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
///
/// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
/// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
@@ -548,7 +614,7 @@ impl UnixDatagram {
.await
}
- /// Try to send a datagram to the peer without waiting.
+ /// Tries to send a datagram to the peer without waiting.
///
/// # Examples
///
@@ -592,7 +658,7 @@ impl UnixDatagram {
.try_io(Interest::WRITABLE, || self.io.send(buf))
}
- /// Try to send a datagram to the peer without waiting.
+ /// Tries to send a datagram to the peer without waiting.
///
/// # Examples
///
@@ -678,7 +744,7 @@ impl UnixDatagram {
.await
}
- /// Try to receive a datagram from the peer without waiting.
+ /// Tries to receive a datagram from the peer without waiting.
///
/// # Examples
///
@@ -729,7 +795,7 @@ impl UnixDatagram {
}
cfg_io_util! {
- /// Try to receive data from the socket without waiting.
+ /// Tries to receive data from the socket without waiting.
///
/// # Examples
///
@@ -790,7 +856,7 @@ impl UnixDatagram {
Ok((n, SocketAddr(addr)))
}
- /// Try to read data from the stream into the provided buffer, advancing the
+ /// Tries to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// # Examples
@@ -1091,7 +1157,7 @@ impl UnixDatagram {
Poll::Ready(Ok(()))
}
- /// Try to receive data from the socket without waiting.
+ /// Tries to receive data from the socket without waiting.
///
/// # Examples
///
@@ -1143,7 +1209,7 @@ impl UnixDatagram {
Ok((n, SocketAddr(addr)))
}
- /// Try to read or write from the socket using a user-provided IO operation.
+ /// Tries to read or write from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The closure
/// should attempt to perform IO operation from the socket by manually
diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs
index efb9503..1785f8b 100644
--- a/src/net/unix/listener.rs
+++ b/src/net/unix/listener.rs
@@ -88,7 +88,7 @@ impl UnixListener {
Ok(UnixListener { io })
}
- /// Turn a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`].
+ /// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`].
///
/// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode
/// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed.
diff --git a/src/net/unix/mod.rs b/src/net/unix/mod.rs
index c3046f1..14cb456 100644
--- a/src/net/unix/mod.rs
+++ b/src/net/unix/mod.rs
@@ -1,4 +1,4 @@
-//! Unix domain socket utility types
+//! Unix domain socket utility types.
// This module does not currently provide any public API, but it was
// unintentionally defined as a public module. Hide it from the documentation
diff --git a/src/net/unix/split.rs b/src/net/unix/split.rs
index 97214f7..d4686c2 100644
--- a/src/net/unix/split.rs
+++ b/src/net/unix/split.rs
@@ -8,14 +8,19 @@
//! split has no associated overhead and enforces all invariants at the type
//! level.
-use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
use crate::net::UnixStream;
+use crate::net::unix::SocketAddr;
use std::io;
use std::net::Shutdown;
use std::pin::Pin;
use std::task::{Context, Poll};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
/// Borrowed read half of a [`UnixStream`], created by [`split`].
///
/// Reading from a `ReadHalf` is usually done using the convenience methods found on the
@@ -47,6 +52,206 @@ pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
(ReadHalf(stream), WriteHalf(stream))
}
+impl ReadHalf<'_> {
+ /// Wait for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.0.ready(interest).await
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_read()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn readable(&self) -> io::Result<()> {
+ self.0.readable().await
+ }
+
+ /// Tries to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.try_read(buf)
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.0.try_read_buf(buf)
+ }
+ }
+
+ /// Tries to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: Self::try_read()
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.0.try_read_vectored(bufs)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.0.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.0.local_addr()
+ }
+}
+
+impl WriteHalf<'_> {
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.0.ready(interest).await
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn writable(&self) -> io::Result<()> {
+ self.0.writable().await
+ }
+
+ /// Tries to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.0.try_write(buf)
+ }
+
+ /// Tries to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: Self::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.0.try_write_vectored(buf)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.0.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.0.local_addr()
+ }
+}
+
impl AsyncRead for ReadHalf<'_> {
fn poll_read(
self: Pin<&mut Self>,
diff --git a/src/net/unix/split_owned.rs b/src/net/unix/split_owned.rs
index 3d6ac6a..9c3a2a4 100644
--- a/src/net/unix/split_owned.rs
+++ b/src/net/unix/split_owned.rs
@@ -8,9 +8,10 @@
//! split has no associated overhead and enforces all invariants at the type
//! level.
-use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
use crate::net::UnixStream;
+use crate::net::unix::SocketAddr;
use std::error::Error;
use std::net::Shutdown;
use std::pin::Pin;
@@ -18,6 +19,10 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::{fmt, io};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
/// Owned read half of a [`UnixStream`], created by [`into_split`].
///
/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
@@ -102,6 +107,124 @@ impl OwnedReadHalf {
pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> {
reunite(self, other)
}
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.inner.ready(interest).await
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_read()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn readable(&self) -> io::Result<()> {
+ self.inner.readable().await
+ }
+
+ /// Tries to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.inner.try_read(buf)
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.inner.try_read_buf(buf)
+ }
+ }
+
+ /// Tries to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: Self::try_read()
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.inner.try_read_vectored(bufs)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
}
impl AsyncRead for OwnedReadHalf {
@@ -124,13 +247,92 @@ impl OwnedWriteHalf {
reunite(other, self)
}
- /// Destroy the write half, but don't close the write half of the stream
+ /// Destroys the write half, but don't close the write half of the stream
/// until the read half is dropped. If the read half has already been
/// dropped, this closes the stream.
pub fn forget(mut self) {
self.shutdown_on_drop = false;
drop(self);
}
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.inner.ready(interest).await
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn writable(&self) -> io::Result<()> {
+ self.inner.writable().await
+ }
+
+ /// Tries to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.inner.try_write(buf)
+ }
+
+ /// Tries to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: Self::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.inner.try_write_vectored(buf)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
}
impl Drop for OwnedWriteHalf {
diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs
index 5837f36..4e7ef87 100644
--- a/src/net/unix/stream.rs
+++ b/src/net/unix/stream.rs
@@ -59,7 +59,7 @@ impl UnixStream {
Ok(stream)
}
- /// Wait for any of the requested ready states.
+ /// Waits for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same socket on a single
@@ -133,7 +133,7 @@ impl UnixStream {
Ok(event.ready)
}
- /// Wait for the socket to become readable.
+ /// Waits for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_read()`.
@@ -290,7 +290,7 @@ impl UnixStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
- /// Try to read data from the stream into the provided buffers, returning
+ /// Tries to read data from the stream into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
@@ -369,7 +369,7 @@ impl UnixStream {
}
cfg_io_util! {
- /// Try to read data from the stream into the provided buffer, advancing the
+ /// Tries to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
@@ -449,7 +449,7 @@ impl UnixStream {
}
}
- /// Wait for the socket to become writable.
+ /// Waits for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with `try_write()`.
@@ -535,7 +535,7 @@ impl UnixStream {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}
- /// Try to write a buffer to the stream, returning how many bytes were
+ /// Tries to write a buffer to the stream, returning how many bytes were
/// written.
///
/// The function will attempt to write the entire contents of `buf`, but
@@ -591,7 +591,7 @@ impl UnixStream {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
- /// Try to write several buffers to the stream, returning how many bytes
+ /// Tries to write several buffers to the stream, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
@@ -653,7 +653,7 @@ impl UnixStream {
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}
- /// Try to read or write from the socket using a user-provided IO operation.
+ /// Tries to read or write from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The closure
/// should attempt to perform IO operation from the socket by manually
@@ -709,7 +709,7 @@ impl UnixStream {
Ok(UnixStream { io })
}
- /// Turn a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`].
+ /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`].
///
/// The returned [`std::os::unix::net::UnixStream`] will have nonblocking
/// mode set as `true`. Use [`set_nonblocking`] to change the blocking
@@ -773,11 +773,41 @@ impl UnixStream {
}
/// Returns the socket address of the local half of this connection.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// println!("{:?}", stream.local_addr()?);
+ /// # Ok(())
+ /// # }
+ /// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.local_addr().map(SocketAddr)
}
/// Returns the socket address of the remote half of this connection.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// println!("{:?}", stream.peer_addr()?);
+ /// # Ok(())
+ /// # }
+ /// ```
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.io.peer_addr().map(SocketAddr)
}
@@ -804,7 +834,7 @@ impl UnixStream {
// These lifetime markers also appear in the generated documentation, and make
// it more clear that this is a *borrowed* split.
#[allow(clippy::needless_lifetimes)]
- /// Split a `UnixStream` into a read half and a write half, which can be used
+ /// Splits a `UnixStream` into a read half and a write half, which can be used
/// to read and write the stream concurrently.
///
/// This method is more efficient than [`into_split`], but the halves cannot be
diff --git a/src/net/unix/ucred.rs b/src/net/unix/ucred.rs
index 49c7142..865303b 100644
--- a/src/net/unix/ucred.rs
+++ b/src/net/unix/ucred.rs
@@ -1,13 +1,13 @@
use libc::{gid_t, pid_t, uid_t};
-/// Credentials of a process
+/// Credentials of a process.
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub struct UCred {
- /// PID (process ID) of the process
+ /// PID (process ID) of the process.
pid: Option<pid_t>,
- /// UID (user ID) of the process
+ /// UID (user ID) of the process.
uid: uid_t,
- /// GID (group ID) of the process
+ /// GID (group ID) of the process.
gid: gid_t,
}
diff --git a/src/net/windows/named_pipe.rs b/src/net/windows/named_pipe.rs
index de6ab58..550fd4d 100644
--- a/src/net/windows/named_pipe.rs
+++ b/src/net/windows/named_pipe.rs
@@ -105,7 +105,7 @@ pub struct NamedPipeServer {
}
impl NamedPipeServer {
- /// Construct a new named pipe server from the specified raw handle.
+ /// Constructs a new named pipe server from the specified raw handle.
///
/// This function will consume ownership of the handle given, passing
/// responsibility for closing the handle to the returned object.
@@ -234,7 +234,7 @@ impl NamedPipeServer {
self.io.disconnect()
}
- /// Wait for any of the requested ready states.
+ /// Waits for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same pipe on a single
@@ -301,7 +301,7 @@ impl NamedPipeServer {
Ok(event.ready)
}
- /// Wait for the pipe to become readable.
+ /// Waits for the pipe to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_read()`.
@@ -383,7 +383,7 @@ impl NamedPipeServer {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}
- /// Try to read data from the pipe into the provided buffer, returning how
+ /// Tries to read data from the pipe into the provided buffer, returning how
/// many bytes were read.
///
/// Receives any pending data from the pipe but does not wait for new data
@@ -450,7 +450,7 @@ impl NamedPipeServer {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
- /// Try to read data from the pipe into the provided buffers, returning
+ /// Tries to read data from the pipe into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
@@ -528,7 +528,7 @@ impl NamedPipeServer {
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}
- /// Wait for the pipe to become writable.
+ /// Waits for the pipe to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with `try_write()`.
@@ -606,7 +606,7 @@ impl NamedPipeServer {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}
- /// Try to write a buffer to the pipe, returning how many bytes were
+ /// Tries to write a buffer to the pipe, returning how many bytes were
/// written.
///
/// The function will attempt to write the entire contents of `buf`, but
@@ -662,7 +662,7 @@ impl NamedPipeServer {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
- /// Try to write several buffers to the pipe, returning how many bytes
+ /// Tries to write several buffers to the pipe, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
@@ -724,7 +724,7 @@ impl NamedPipeServer {
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}
- /// Try to read or write from the socket using a user-provided IO operation.
+ /// Tries to read or write from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The closure
/// should attempt to perform IO operation from the socket by manually
@@ -846,7 +846,7 @@ pub struct NamedPipeClient {
}
impl NamedPipeClient {
- /// Construct a new named pipe client from the specified raw handle.
+ /// Constructs a new named pipe client from the specified raw handle.
///
/// This function will consume ownership of the handle given, passing
/// responsibility for closing the handle to the returned object.
@@ -896,7 +896,7 @@ impl NamedPipeClient {
unsafe { named_pipe_info(self.io.as_raw_handle()) }
}
- /// Wait for any of the requested ready states.
+ /// Waits for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same pipe on a single
@@ -962,7 +962,7 @@ impl NamedPipeClient {
Ok(event.ready)
}
- /// Wait for the pipe to become readable.
+ /// Waits for the pipe to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_read()`.
@@ -1043,7 +1043,7 @@ impl NamedPipeClient {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}
- /// Try to read data from the pipe into the provided buffer, returning how
+ /// Tries to read data from the pipe into the provided buffer, returning how
/// many bytes were read.
///
/// Receives any pending data from the pipe but does not wait for new data
@@ -1109,7 +1109,7 @@ impl NamedPipeClient {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
- /// Try to read data from the pipe into the provided buffers, returning
+ /// Tries to read data from the pipe into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
@@ -1186,7 +1186,7 @@ impl NamedPipeClient {
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}
- /// Wait for the pipe to become writable.
+ /// Waits for the pipe to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with `try_write()`.
@@ -1263,7 +1263,7 @@ impl NamedPipeClient {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}
- /// Try to write a buffer to the pipe, returning how many bytes were
+ /// Tries to write a buffer to the pipe, returning how many bytes were
/// written.
///
/// The function will attempt to write the entire contents of `buf`, but
@@ -1318,7 +1318,7 @@ impl NamedPipeClient {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
- /// Try to write several buffers to the pipe, returning how many bytes
+ /// Tries to write several buffers to the pipe, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
@@ -1379,7 +1379,7 @@ impl NamedPipeClient {
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}
- /// Try to read or write from the socket using a user-provided IO operation.
+ /// Tries to read or write from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The closure
/// should attempt to perform IO operation from the socket by manually
@@ -1882,7 +1882,7 @@ impl ServerOptions {
self
}
- /// Create the named pipe identified by `addr` for use as a server.
+ /// Creates the named pipe identified by `addr` for use as a server.
///
/// This uses the [`CreateNamedPipe`] function.
///
@@ -1913,7 +1913,7 @@ impl ServerOptions {
unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
}
- /// Create the named pipe identified by `addr` for use as a server.
+ /// Creates the named pipe identified by `addr` for use as a server.
///
/// This is the same as [`create`] except that it supports providing the raw
/// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
@@ -2042,7 +2042,7 @@ impl ClientOptions {
self
}
- /// Open the named pipe identified by `addr`.
+ /// Opens the named pipe identified by `addr`.
///
/// This opens the client using [`CreateFile`] with the
/// `dwCreationDisposition` option set to `OPEN_EXISTING`.
@@ -2099,7 +2099,7 @@ impl ClientOptions {
unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
}
- /// Open the named pipe identified by `addr`.
+ /// Opens the named pipe identified by `addr`.
///
/// This is the same as [`open`] except that it supports providing the raw
/// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
@@ -2201,7 +2201,7 @@ pub struct PipeInfo {
pub in_buffer_size: u32,
}
-/// Encode an address so that it is a null-terminated wide string.
+/// Encodes an address so that it is a null-terminated wide string.
fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
let len = addr.as_ref().encode_wide().count();
let mut vec = Vec::with_capacity(len + 1);
diff --git a/src/park/mod.rs b/src/park/mod.rs
index edd9371..87d04ff 100644
--- a/src/park/mod.rs
+++ b/src/park/mod.rs
@@ -45,12 +45,12 @@ use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
-/// Block the current thread.
+/// Blocks the current thread.
pub(crate) trait Park {
/// Unpark handle type for the `Park` implementation.
type Unpark: Unpark;
- /// Error returned by `park`
+ /// Error returned by `park`.
type Error: Debug;
/// Gets a new `Unpark` handle associated with this `Park` instance.
@@ -66,7 +66,7 @@ pub(crate) trait Park {
///
/// This function **should** not panic, but ultimately, panics are left as
/// an implementation detail. Refer to the documentation for the specific
- /// `Park` implementation
+ /// `Park` implementation.
fn park(&mut self) -> Result<(), Self::Error>;
/// Parks the current thread for at most `duration`.
@@ -82,10 +82,10 @@ pub(crate) trait Park {
///
/// This function **should** not panic, but ultimately, panics are left as
/// an implementation detail. Refer to the documentation for the specific
- /// `Park` implementation
+ /// `Park` implementation.
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>;
- /// Release all resources holded by the parker for proper leak-free shutdown
+ /// Releases all resources holded by the parker for proper leak-free shutdown.
fn shutdown(&mut self);
}
@@ -100,7 +100,7 @@ pub(crate) trait Unpark: Sync + Send + 'static {
///
/// This function **should** not panic, but ultimately, panics are left as
/// an implementation detail. Refer to the documentation for the specific
- /// `Unpark` implementation
+ /// `Unpark` implementation.
fn unpark(&self);
}
diff --git a/src/park/thread.rs b/src/park/thread.rs
index 2725e45..27ce202 100644
--- a/src/park/thread.rs
+++ b/src/park/thread.rs
@@ -76,7 +76,7 @@ impl Park for ParkThread {
// ==== impl Inner ====
impl Inner {
- /// Park the current thread for at most `dur`.
+ /// Parks the current thread for at most `dur`.
fn park(&self) {
// If we were previously notified then we consume this notification and
// return quickly.
@@ -227,7 +227,7 @@ pub(crate) struct CachedParkThread {
}
impl CachedParkThread {
- /// Create a new `ParkThread` handle for the current thread.
+ /// Creates a new `ParkThread` handle for the current thread.
///
/// This type cannot be moved to other threads, so it should be created on
/// the thread that the caller intends to park.
@@ -241,7 +241,7 @@ impl CachedParkThread {
self.with_current(|park_thread| park_thread.unpark())
}
- /// Get a reference to the `ParkThread` handle for this thread.
+ /// Gets a reference to the `ParkThread` handle for this thread.
fn with_current<F, R>(&self, f: F) -> Result<R, ParkError>
where
F: FnOnce(&ParkThread) -> R,
diff --git a/src/process/mod.rs b/src/process/mod.rs
index 7a15024..6eeefdb 100644
--- a/src/process/mod.rs
+++ b/src/process/mod.rs
@@ -578,7 +578,7 @@ impl Command {
self
}
- /// Set executable argument
+ /// Sets executable argument.
///
/// Set the first process argument, `argv[0]`, to something other than the
/// default executable path.
@@ -1173,7 +1173,7 @@ pub struct ChildStderr {
}
impl ChildStdin {
- /// Create an asynchronous `ChildStdin` from a synchronous one.
+ /// Creates an asynchronous `ChildStdin` from a synchronous one.
///
/// # Errors
///
@@ -1188,7 +1188,7 @@ impl ChildStdin {
}
impl ChildStdout {
- /// Create an asynchronous `ChildStderr` from a synchronous one.
+ /// Creates an asynchronous `ChildStderr` from a synchronous one.
///
/// # Errors
///
@@ -1203,7 +1203,7 @@ impl ChildStdout {
}
impl ChildStderr {
- /// Create an asynchronous `ChildStderr` from a synchronous one.
+ /// Creates an asynchronous `ChildStderr` from a synchronous one.
///
/// # Errors
///
diff --git a/src/process/unix/driver.rs b/src/process/unix/driver.rs
index 43b2efa..84dc8fb 100644
--- a/src/process/unix/driver.rs
+++ b/src/process/unix/driver.rs
@@ -1,6 +1,6 @@
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
-//! Process driver
+//! Process driver.
use crate::park::Park;
use crate::process::unix::GlobalOrphanQueue;
diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs
index 0f379c9..576fe6c 100644
--- a/src/process/unix/mod.rs
+++ b/src/process/unix/mod.rs
@@ -1,4 +1,4 @@
-//! Unix handling of child processes
+//! Unix handling of child processes.
//!
//! Right now the only "fancy" thing about this is how we implement the
//! `Future` implementation on `Child` to get the exit status. Unix offers
diff --git a/src/runtime/basic_scheduler.rs b/src/runtime/basic_scheduler.rs
index e37d872..872d0d5 100644
--- a/src/runtime/basic_scheduler.rs
+++ b/src/runtime/basic_scheduler.rs
@@ -2,6 +2,7 @@ use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
+use crate::runtime::context::EnterGuard;
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::Callback;
@@ -12,7 +13,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
-use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
+use std::sync::atomic::Ordering::{AcqRel, Release};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::time::Duration;
@@ -29,6 +30,12 @@ pub(crate) struct BasicScheduler<P: Park> {
/// Sendable task spawner
spawner: Spawner,
+
+ /// This is usually None, but right before dropping the BasicScheduler, it
+ /// is changed to `Some` with the context being the runtime's own context.
+ /// This ensures that any tasks dropped in the `BasicScheduler`s destructor
+ /// run in that runtime's context.
+ context_guard: Option<EnterGuard>,
}
/// The inner scheduler that owns the task queue and the main parker P.
@@ -160,6 +167,7 @@ impl<P: Park> BasicScheduler<P> {
inner,
notify: Notify::new(),
spawner,
+ context_guard: None,
}
}
@@ -210,22 +218,24 @@ impl<P: Park> BasicScheduler<P> {
basic_scheduler: self,
})
}
+
+ pub(super) fn set_context_guard(&mut self, guard: EnterGuard) {
+ self.context_guard = Some(guard);
+ }
}
impl<P: Park> Inner<P> {
- /// Block on the future provided and drive the runtime's driver.
+ /// Blocks on the provided future and drives the runtime's driver.
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
enter(self, |scheduler, context| {
let _enter = crate::runtime::enter(false);
let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
- let mut polled = false;
pin!(future);
'outer: loop {
- if scheduler.spawner.was_woken() || !polled {
- polled = true;
+ if scheduler.spawner.reset_woken() {
scheduler.stats.incr_poll_count();
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
@@ -301,8 +311,8 @@ impl<P: Park> Inner<P> {
}
}
-/// Enter the scheduler context. This sets the queue and other necessary
-/// scheduler state in the thread-local
+/// Enters the scheduler context. This sets the queue and other necessary
+/// scheduler state in the thread-local.
fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
where
F: FnOnce(&mut Inner<P>, &Context) -> R,
@@ -418,13 +428,15 @@ impl Spawner {
}
fn waker_ref(&self) -> WakerRef<'_> {
- // clear the woken bit
- self.shared.woken.swap(false, AcqRel);
+ // Set woken to true when enter block_on, ensure outer future
+ // be polled for the first time when enter loop
+ self.shared.woken.store(true, Release);
waker_ref(&self.shared)
}
- fn was_woken(&self) -> bool {
- self.shared.woken.load(Acquire)
+ // reset woken to false and return original value
+ pub(crate) fn reset_woken(&self) -> bool {
+ self.shared.woken.swap(false, AcqRel)
}
}
diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs
index 0c23bb0..77ab495 100644
--- a/src/runtime/blocking/pool.rs
+++ b/src/runtime/blocking/pool.rs
@@ -8,7 +8,6 @@ use crate::runtime::builder::ThreadNameFn;
use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
-use crate::util::error::CONTEXT_MISSING_ERROR;
use std::collections::{HashMap, VecDeque};
use std::fmt;
@@ -25,28 +24,28 @@ pub(crate) struct Spawner {
}
struct Inner {
- /// State shared between worker threads
+ /// State shared between worker threads.
shared: Mutex<Shared>,
/// Pool threads wait on this.
condvar: Condvar,
- /// Spawned threads use this name
+ /// Spawned threads use this name.
thread_name: ThreadNameFn,
- /// Spawned thread stack size
+ /// Spawned thread stack size.
stack_size: Option<usize>,
- /// Call after a thread starts
+ /// Call after a thread starts.
after_start: Option<Callback>,
- /// Call before a thread stops
+ /// Call before a thread stops.
before_stop: Option<Callback>,
- // Maximum number of threads
+ // Maximum number of threads.
thread_cap: usize,
- // Customizable wait timeout
+ // Customizable wait timeout.
keep_alive: Duration,
}
@@ -67,7 +66,7 @@ struct Shared {
/// calling shutdown handles joining on these.
worker_threads: HashMap<usize, thread::JoinHandle<()>>,
/// This is a counter used to iterate worker_threads in a consistent order (for loom's
- /// benefit)
+ /// benefit).
worker_thread_index: usize,
}
@@ -75,13 +74,13 @@ type Task = task::UnownedTask<NoopSchedule>;
const KEEP_ALIVE: Duration = Duration::from_secs(10);
-/// Run the provided function on an executor dedicated to blocking operations.
+/// Runs the provided function on an executor dedicated to blocking operations.
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
- let rt = context::current().expect(CONTEXT_MISSING_ERROR);
+ let rt = context::current();
rt.spawn_blocking(func)
}
diff --git a/src/runtime/blocking/shutdown.rs b/src/runtime/blocking/shutdown.rs
index 0cf2285..e6f4674 100644
--- a/src/runtime/blocking/shutdown.rs
+++ b/src/runtime/blocking/shutdown.rs
@@ -10,7 +10,7 @@ use std::time::Duration;
#[derive(Debug, Clone)]
pub(super) struct Sender {
- tx: Arc<oneshot::Sender<()>>,
+ _tx: Arc<oneshot::Sender<()>>,
}
#[derive(Debug)]
@@ -20,7 +20,7 @@ pub(super) struct Receiver {
pub(super) fn channel() -> (Sender, Receiver) {
let (tx, rx) = oneshot::channel();
- let tx = Sender { tx: Arc::new(tx) };
+ let tx = Sender { _tx: Arc::new(tx) };
let rx = Receiver { rx };
(tx, rx)
diff --git a/src/runtime/blocking/task.rs b/src/runtime/blocking/task.rs
index ee2d8d6..0b7803a 100644
--- a/src/runtime/blocking/task.rs
+++ b/src/runtime/blocking/task.rs
@@ -2,13 +2,13 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
-/// Converts a function to a future that completes on poll
+/// Converts a function to a future that completes on poll.
pub(crate) struct BlockingTask<T> {
func: Option<T>,
}
impl<T> BlockingTask<T> {
- /// Initializes a new blocking task from the given function
+ /// Initializes a new blocking task from the given function.
pub(crate) fn new(func: T) -> BlockingTask<T> {
BlockingTask { func: Some(func) }
}
diff --git a/src/runtime/context.rs b/src/runtime/context.rs
index a727ed4..1f44a53 100644
--- a/src/runtime/context.rs
+++ b/src/runtime/context.rs
@@ -1,5 +1,5 @@
//! Thread local runtime context
-use crate::runtime::Handle;
+use crate::runtime::{Handle, TryCurrentError};
use std::cell::RefCell;
@@ -7,58 +7,96 @@ thread_local! {
static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None)
}
-pub(crate) fn current() -> Option<Handle> {
- CONTEXT.with(|ctx| ctx.borrow().clone())
+pub(crate) fn try_current() -> Result<Handle, crate::runtime::TryCurrentError> {
+ match CONTEXT.try_with(|ctx| ctx.borrow().clone()) {
+ Ok(Some(handle)) => Ok(handle),
+ Ok(None) => Err(TryCurrentError::new_no_context()),
+ Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()),
+ }
+}
+
+pub(crate) fn current() -> Handle {
+ match try_current() {
+ Ok(handle) => handle,
+ Err(e) => panic!("{}", e),
+ }
}
cfg_io_driver! {
pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
- CONTEXT.with(|ctx| {
+ match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).io_handle.clone()
- })
+ }) {
+ Ok(io_handle) => io_handle,
+ Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
+ }
}
}
cfg_signal_internal! {
#[cfg(unix)]
pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {
- CONTEXT.with(|ctx| {
+ match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).signal_handle.clone()
- })
+ }) {
+ Ok(signal_handle) => signal_handle,
+ Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
+ }
}
}
cfg_time! {
pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle {
- CONTEXT.with(|ctx| {
+ match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).time_handle.clone()
- })
+ }) {
+ Ok(time_handle) => time_handle,
+ Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
+ }
}
cfg_test_util! {
pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> {
- CONTEXT.with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.clock.clone()))
+ match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.clock.clone())) {
+ Ok(clock) => clock,
+ Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
+ }
}
}
}
cfg_rt! {
pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> {
- CONTEXT.with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone()))
+ match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone())) {
+ Ok(spawner) => spawner,
+ Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
+ }
}
}
-/// Set this [`Handle`] as the current active [`Handle`].
+/// Sets this [`Handle`] as the current active [`Handle`].
///
/// [`Handle`]: Handle
pub(crate) fn enter(new: Handle) -> EnterGuard {
- CONTEXT.with(|ctx| {
- let old = ctx.borrow_mut().replace(new);
- EnterGuard(old)
- })
+ match try_enter(new) {
+ Some(guard) => guard,
+ None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
+ }
+}
+
+/// Sets this [`Handle`] as the current active [`Handle`].
+///
+/// [`Handle`]: Handle
+pub(crate) fn try_enter(new: Handle) -> Option<EnterGuard> {
+ CONTEXT
+ .try_with(|ctx| {
+ let old = ctx.borrow_mut().replace(new);
+ EnterGuard(old)
+ })
+ .ok()
}
#[derive(Debug)]
diff --git a/src/runtime/enter.rs b/src/runtime/enter.rs
index e91408f..3f14cb5 100644
--- a/src/runtime/enter.rs
+++ b/src/runtime/enter.rs
@@ -92,7 +92,7 @@ cfg_rt_multi_thread! {
}
cfg_rt! {
- /// Disallow blocking in the current runtime context until the guard is dropped.
+ /// Disallows blocking in the current runtime context until the guard is dropped.
pub(crate) fn disallow_blocking() -> DisallowBlockingGuard {
let reset = ENTERED.with(|c| {
if let EnterContext::Entered {
diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs
index bad6a00..cd1cb76 100644
--- a/src/runtime/handle.rs
+++ b/src/runtime/handle.rs
@@ -1,9 +1,10 @@
use crate::runtime::blocking::{BlockingTask, NoopSchedule};
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{blocking, context, driver, Spawner};
-use crate::util::error::CONTEXT_MISSING_ERROR;
+use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};
use std::future::Future;
+use std::marker::PhantomData;
use std::{error, fmt};
/// Handle to the runtime.
@@ -17,15 +18,25 @@ pub struct Handle {
pub(super) spawner: Spawner,
/// Handles to the I/O drivers
+ #[cfg_attr(
+ not(any(feature = "net", feature = "process", all(unix, feature = "signal"))),
+ allow(dead_code)
+ )]
pub(super) io_handle: driver::IoHandle,
/// Handles to the signal drivers
+ #[cfg_attr(
+ not(any(feature = "signal", all(unix, feature = "process"))),
+ allow(dead_code)
+ )]
pub(super) signal_handle: driver::SignalHandle,
/// Handles to the time drivers
+ #[cfg_attr(not(feature = "time"), allow(dead_code))]
pub(super) time_handle: driver::TimeHandle,
/// Source of `Instant::now()`
+ #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
pub(super) clock: driver::Clock,
/// Blocking pool spawner
@@ -41,12 +52,12 @@ pub struct Handle {
#[derive(Debug)]
#[must_use = "Creating and dropping a guard does nothing"]
pub struct EnterGuard<'a> {
- handle: &'a Handle,
- guard: context::EnterGuard,
+ _guard: context::EnterGuard,
+ _handle_lifetime: PhantomData<&'a Handle>,
}
impl Handle {
- /// Enter the runtime context. This allows you to construct types that must
+ /// Enters the runtime context. This allows you to construct types that must
/// have an executor available on creation such as [`Sleep`] or [`TcpStream`].
/// It will also allow you to call methods such as [`tokio::spawn`].
///
@@ -55,12 +66,12 @@ impl Handle {
/// [`tokio::spawn`]: fn@crate::spawn
pub fn enter(&self) -> EnterGuard<'_> {
EnterGuard {
- handle: self,
- guard: context::enter(self.clone()),
+ _guard: context::enter(self.clone()),
+ _handle_lifetime: PhantomData,
}
}
- /// Returns a `Handle` view over the currently running `Runtime`
+ /// Returns a `Handle` view over the currently running `Runtime`.
///
/// # Panic
///
@@ -99,7 +110,7 @@ impl Handle {
/// # }
/// ```
pub fn current() -> Self {
- context::current().expect(CONTEXT_MISSING_ERROR)
+ context::current()
}
/// Returns a Handle view over the currently running Runtime
@@ -108,7 +119,7 @@ impl Handle {
///
/// Contrary to `current`, this never panics
pub fn try_current() -> Result<Self, TryCurrentError> {
- context::current().ok_or(TryCurrentError(()))
+ context::try_current()
}
cfg_stats! {
@@ -119,7 +130,7 @@ impl Handle {
}
}
- /// Spawn a future onto the Tokio runtime.
+ /// Spawns a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
@@ -157,7 +168,7 @@ impl Handle {
self.spawner.spawn(future)
}
- /// Run the provided function on an executor dedicated to blocking
+ /// Runs the provided function on an executor dedicated to blocking.
/// operations.
///
/// # Examples
@@ -182,7 +193,11 @@ impl Handle {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
- self.spawn_blocking_inner(func, None)
+ if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
+ self.spawn_blocking_inner(Box::new(func), None)
+ } else {
+ self.spawn_blocking_inner(func, None)
+ }
}
#[cfg_attr(tokio_track_caller, track_caller)]
@@ -226,7 +241,7 @@ impl Handle {
handle
}
- /// Run a future to completion on this `Handle`'s associated `Runtime`.
+ /// Runs a future to completion on this `Handle`'s associated `Runtime`.
///
/// This runs the given future on the current thread, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
@@ -319,17 +334,60 @@ impl Handle {
}
/// Error returned by `try_current` when no Runtime has been started
-pub struct TryCurrentError(());
+#[derive(Debug)]
+pub struct TryCurrentError {
+ kind: TryCurrentErrorKind,
+}
+
+impl TryCurrentError {
+ pub(crate) fn new_no_context() -> Self {
+ Self {
+ kind: TryCurrentErrorKind::NoContext,
+ }
+ }
+
+ pub(crate) fn new_thread_local_destroyed() -> Self {
+ Self {
+ kind: TryCurrentErrorKind::ThreadLocalDestroyed,
+ }
+ }
-impl fmt::Debug for TryCurrentError {
+ /// Returns true if the call failed because there is currently no runtime in
+ /// the Tokio context.
+ pub fn is_missing_context(&self) -> bool {
+ matches!(self.kind, TryCurrentErrorKind::NoContext)
+ }
+
+ /// Returns true if the call failed because the Tokio context thread-local
+ /// had been destroyed. This can usually only happen if in the destructor of
+ /// other thread-locals.
+ pub fn is_thread_local_destroyed(&self) -> bool {
+ matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed)
+ }
+}
+
+enum TryCurrentErrorKind {
+ NoContext,
+ ThreadLocalDestroyed,
+}
+
+impl fmt::Debug for TryCurrentErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("TryCurrentError").finish()
+ use TryCurrentErrorKind::*;
+ match self {
+ NoContext => f.write_str("NoContext"),
+ ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"),
+ }
}
}
impl fmt::Display for TryCurrentError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.write_str(CONTEXT_MISSING_ERROR)
+ use TryCurrentErrorKind::*;
+ match self.kind {
+ NoContext => f.write_str(CONTEXT_MISSING_ERROR),
+ ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR),
+ }
}
}
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs
index ec7d0c0..96bb47c 100644
--- a/src/runtime/mod.rs
+++ b/src/runtime/mod.rs
@@ -205,7 +205,7 @@ cfg_rt! {
use self::enter::enter;
mod handle;
- pub use handle::{EnterGuard, Handle};
+ pub use handle::{EnterGuard, Handle, TryCurrentError};
mod spawner;
use self::spawner::Spawner;
@@ -294,7 +294,7 @@ cfg_rt! {
type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
impl Runtime {
- /// Create a new runtime instance with default configuration values.
+ /// Creates a new runtime instance with default configuration values.
///
/// This results in the multi threaded scheduler, I/O driver, and time driver being
/// initialized.
@@ -329,7 +329,7 @@ cfg_rt! {
Builder::new_multi_thread().enable_all().build()
}
- /// Return a handle to the runtime's spawner.
+ /// Returns a handle to the runtime's spawner.
///
/// The returned handle can be used to spawn tasks that run on this runtime, and can
/// be cloned to allow moving the `Handle` to other threads.
@@ -350,7 +350,7 @@ cfg_rt! {
&self.handle
}
- /// Spawn a future onto the Tokio runtime.
+ /// Spawns a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
@@ -384,7 +384,7 @@ cfg_rt! {
self.handle.spawn(future)
}
- /// Run the provided function on an executor dedicated to blocking operations.
+ /// Runs the provided function on an executor dedicated to blocking operations.
///
/// # Examples
///
@@ -409,7 +409,7 @@ cfg_rt! {
self.handle.spawn_blocking(func)
}
- /// Run a future to completion on the Tokio runtime. This is the
+ /// Runs a future to completion on the Tokio runtime. This is the
/// runtime's entry point.
///
/// This runs the given future on the current thread, blocking until it is
@@ -464,7 +464,7 @@ cfg_rt! {
}
}
- /// Enter the runtime context.
+ /// Enters the runtime context.
///
/// This allows you to construct types that must have an executor
/// available on creation such as [`Sleep`] or [`TcpStream`]. It will
@@ -500,7 +500,7 @@ cfg_rt! {
self.handle.enter()
}
- /// Shutdown the runtime, waiting for at most `duration` for all spawned
+ /// Shuts down the runtime, waiting for at most `duration` for all spawned
/// task to shutdown.
///
/// Usually, dropping a `Runtime` handle is sufficient as tasks are able to
@@ -537,11 +537,11 @@ cfg_rt! {
/// ```
pub fn shutdown_timeout(mut self, duration: Duration) {
// Wakeup and shutdown all the worker threads
- self.handle.shutdown();
+ self.handle.clone().shutdown();
self.blocking_pool.shutdown(Some(duration));
}
- /// Shutdown the runtime, without waiting for any spawned tasks to shutdown.
+ /// Shuts down the runtime, without waiting for any spawned tasks to shutdown.
///
/// This can be useful if you want to drop a runtime from within another runtime.
/// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
@@ -571,4 +571,30 @@ cfg_rt! {
self.shutdown_timeout(Duration::from_nanos(0))
}
}
+
+ #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let
+ impl Drop for Runtime {
+ fn drop(&mut self) {
+ match &mut self.kind {
+ Kind::CurrentThread(basic) => {
+ // This ensures that tasks spawned on the basic runtime are dropped inside the
+ // runtime's context.
+ match self::context::try_enter(self.handle.clone()) {
+ Some(guard) => basic.set_context_guard(guard),
+ None => {
+ // The context thread-local has alread been destroyed.
+ //
+ // We don't set the guard in this case. Calls to tokio::spawn in task
+ // destructors would fail regardless if this happens.
+ },
+ }
+ },
+ #[cfg(feature = "rt-multi-thread")]
+ Kind::ThreadPool(_) => {
+ // The threaded scheduler drops its tasks on its worker threads, which is
+ // already in the runtime's context.
+ },
+ }
+ }
+ }
}
diff --git a/src/runtime/stats/stats.rs b/src/runtime/stats/stats.rs
index 39a48ae..b2bcacc 100644
--- a/src/runtime/stats/stats.rs
+++ b/src/runtime/stats/stats.rs
@@ -1,6 +1,9 @@
//! This file contains the types necessary to collect various types of stats.
use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed};
+use std::convert::TryFrom;
+use std::time::{Duration, Instant};
+
/// This type contains methods to retrieve stats from a Tokio runtime.
#[derive(Debug)]
pub struct RuntimeStats {
@@ -14,6 +17,7 @@ pub struct WorkerStats {
park_count: AtomicU64,
steal_count: AtomicU64,
poll_count: AtomicU64,
+ busy_duration_total: AtomicU64,
}
impl RuntimeStats {
@@ -24,6 +28,7 @@ impl RuntimeStats {
park_count: AtomicU64::new(0),
steal_count: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
+ busy_duration_total: AtomicU64::new(0),
});
}
@@ -54,6 +59,11 @@ impl WorkerStats {
pub fn poll_count(&self) -> u64 {
self.poll_count.load(Relaxed)
}
+
+ /// Returns the total amount of time this worker has been busy for.
+ pub fn total_busy_duration(&self) -> Duration {
+ Duration::from_nanos(self.busy_duration_total.load(Relaxed))
+ }
}
pub(crate) struct WorkerStatsBatcher {
@@ -61,6 +71,9 @@ pub(crate) struct WorkerStatsBatcher {
park_count: u64,
steal_count: u64,
poll_count: u64,
+ /// The total busy duration in nanoseconds.
+ busy_duration_total: u64,
+ last_resume_time: Instant,
}
impl WorkerStatsBatcher {
@@ -70,6 +83,8 @@ impl WorkerStatsBatcher {
park_count: 0,
steal_count: 0,
poll_count: 0,
+ busy_duration_total: 0,
+ last_resume_time: Instant::now(),
}
}
pub(crate) fn submit(&mut self, to: &RuntimeStats) {
@@ -78,13 +93,23 @@ impl WorkerStatsBatcher {
worker.park_count.store(self.park_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
worker.poll_count.store(self.poll_count, Relaxed);
+
+ worker
+ .busy_duration_total
+ .store(self.busy_duration_total, Relaxed);
}
pub(crate) fn about_to_park(&mut self) {
self.park_count += 1;
+
+ let busy_duration = self.last_resume_time.elapsed();
+ let busy_duration = u64::try_from(busy_duration.as_nanos()).unwrap_or(u64::MAX);
+ self.busy_duration_total += busy_duration;
}
- pub(crate) fn returned_from_park(&mut self) {}
+ pub(crate) fn returned_from_park(&mut self) {
+ self.last_resume_time = Instant::now();
+ }
#[cfg(feature = "rt-multi-thread")]
pub(crate) fn incr_steal_count(&mut self, by: u16) {
diff --git a/src/runtime/task/core.rs b/src/runtime/task/core.rs
index 51b6496..776e834 100644
--- a/src/runtime/task/core.rs
+++ b/src/runtime/task/core.rs
@@ -44,22 +44,22 @@ pub(super) struct CoreStage<T: Future> {
///
/// Holds the future or output, depending on the stage of execution.
pub(super) struct Core<T: Future, S> {
- /// Scheduler used to drive this future
+ /// Scheduler used to drive this future.
pub(super) scheduler: S,
- /// Either the future or the output
+ /// Either the future or the output.
pub(super) stage: CoreStage<T>,
}
/// Crate public as this is also needed by the pool.
#[repr(C)]
pub(crate) struct Header {
- /// Task state
+ /// Task state.
pub(super) state: State,
pub(super) owned: UnsafeCell<linked_list::Pointers<Header>>,
- /// Pointer to next task, used with the injection queue
+ /// Pointer to next task, used with the injection queue.
pub(super) queue_next: UnsafeCell<Option<NonNull<Header>>>,
/// Table of function pointers for executing actions on the task.
@@ -133,7 +133,7 @@ impl<T: Future> CoreStage<T> {
self.stage.with_mut(f)
}
- /// Poll the future
+ /// Polls the future.
///
/// # Safety
///
@@ -169,7 +169,7 @@ impl<T: Future> CoreStage<T> {
res
}
- /// Drop the future
+ /// Drops the future.
///
/// # Safety
///
@@ -181,7 +181,7 @@ impl<T: Future> CoreStage<T> {
}
}
- /// Store the task output
+ /// Stores the task output.
///
/// # Safety
///
@@ -193,7 +193,7 @@ impl<T: Future> CoreStage<T> {
}
}
- /// Take the task output
+ /// Takes the task output.
///
/// # Safety
///
diff --git a/src/runtime/task/error.rs b/src/runtime/task/error.rs
index 17fb093..1a8129b 100644
--- a/src/runtime/task/error.rs
+++ b/src/runtime/task/error.rs
@@ -29,12 +29,12 @@ impl JoinError {
}
}
- /// Returns true if the error was caused by the task being cancelled
+ /// Returns true if the error was caused by the task being cancelled.
pub fn is_cancelled(&self) -> bool {
matches!(&self.repr, Repr::Cancelled)
}
- /// Returns true if the error was caused by the task panicking
+ /// Returns true if the error was caused by the task panicking.
///
/// # Examples
///
diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs
index 41b4193..0996e52 100644
--- a/src/runtime/task/harness.rs
+++ b/src/runtime/task/harness.rs
@@ -10,7 +10,7 @@ use std::panic;
use std::ptr::NonNull;
use std::task::{Context, Poll, Waker};
-/// Typed raw task handle
+/// Typed raw task handle.
pub(super) struct Harness<T: Future, S: 'static> {
cell: NonNull<Cell<T, S>>,
}
@@ -74,7 +74,7 @@ where
}
}
- /// Poll the task and cancel it if necessary. This takes ownership of a
+ /// Polls the task and cancel it if necessary. This takes ownership of a
/// ref-count.
///
/// If the return value is Notified, the caller is given ownership of two
@@ -124,7 +124,7 @@ where
}
}
- /// Forcibly shutdown the task
+ /// Forcibly shuts down the task.
///
/// Attempt to transition to `Running` in order to forcibly shutdown the
/// task. If the task is currently running or in a state of completion, then
@@ -192,7 +192,7 @@ where
}
}
- /// Remotely abort the task.
+ /// Remotely aborts the task.
///
/// The caller should hold a ref-count, but we do not consume it.
///
@@ -280,7 +280,7 @@ where
// ====== internal ======
- /// Complete the task. This method assumes that the state is RUNNING.
+ /// Completes the task. This method assumes that the state is RUNNING.
fn complete(self) {
// The future has completed and its output has been written to the task
// stage. We transition from running to complete.
@@ -310,7 +310,7 @@ where
}
}
- /// Release the task from the scheduler. Returns the number of ref-counts
+ /// Releases the task from the scheduler. Returns the number of ref-counts
/// that should be decremented.
fn release(&self) -> usize {
// We don't actually increment the ref-count here, but the new task is
@@ -325,7 +325,7 @@ where
}
}
- /// Create a new task that holds its own ref-count.
+ /// Creates a new task that holds its own ref-count.
///
/// # Safety
///
@@ -425,7 +425,7 @@ enum PollFuture {
Dealloc,
}
-/// Cancel the task and store the appropriate error in the stage field.
+/// Cancels the task and store the appropriate error in the stage field.
fn cancel_task<T: Future>(stage: &CoreStage<T>) {
// Drop the future from a panic guard.
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
@@ -442,7 +442,7 @@ fn cancel_task<T: Future>(stage: &CoreStage<T>) {
}
}
-/// Poll the future. If the future completes, the output is written to the
+/// Polls the future. If the future completes, the output is written to the
/// stage field.
fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
// Poll the future.
diff --git a/src/runtime/task/inject.rs b/src/runtime/task/inject.rs
index d1f0aee..1585e13 100644
--- a/src/runtime/task/inject.rs
+++ b/src/runtime/task/inject.rs
@@ -11,7 +11,7 @@ use std::sync::atomic::Ordering::{Acquire, Release};
/// Growable, MPMC queue used to inject new tasks into the scheduler and as an
/// overflow queue when the local, fixed-size, array queue overflows.
pub(crate) struct Inject<T: 'static> {
- /// Pointers to the head and tail of the queue
+ /// Pointers to the head and tail of the queue.
pointers: Mutex<Pointers>,
/// Number of pending tasks in the queue. This helps prevent unnecessary
@@ -22,13 +22,13 @@ pub(crate) struct Inject<T: 'static> {
}
struct Pointers {
- /// True if the queue is closed
+ /// True if the queue is closed.
is_closed: bool,
- /// Linked-list head
+ /// Linked-list head.
head: Option<NonNull<task::Header>>,
- /// Linked-list tail
+ /// Linked-list tail.
tail: Option<NonNull<task::Header>>,
}
@@ -52,7 +52,7 @@ impl<T: 'static> Inject<T> {
self.len() == 0
}
- /// Close the injection queue, returns `true` if the queue is open when the
+ /// Closes the injection queue, returns `true` if the queue is open when the
/// transition is made.
pub(crate) fn close(&self) -> bool {
let mut p = self.pointers.lock();
@@ -137,7 +137,7 @@ impl<T: 'static> Inject<T> {
self.push_batch_inner(first, prev, counter);
}
- /// Insert several tasks that have been linked together into the queue.
+ /// Inserts several tasks that have been linked together into the queue.
///
/// The provided head and tail may be be the same task. In this case, a
/// single task is inserted.
diff --git a/src/runtime/task/list.rs b/src/runtime/task/list.rs
index edd3c4f..7758f8d 100644
--- a/src/runtime/task/list.rs
+++ b/src/runtime/task/list.rs
@@ -78,7 +78,7 @@ impl<S: 'static> OwnedTasks<S> {
}
}
- /// Bind the provided task to this OwnedTasks instance. This fails if the
+ /// Binds the provided task to this OwnedTasks instance. This fails if the
/// OwnedTasks has been closed.
pub(crate) fn bind<T>(
&self,
@@ -110,7 +110,7 @@ impl<S: 'static> OwnedTasks<S> {
}
}
- /// Assert that the given task is owned by this OwnedTasks and convert it to
+ /// Asserts that the given task is owned by this OwnedTasks and convert it to
/// a LocalNotified, giving the thread permission to poll this task.
#[inline]
pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
@@ -124,7 +124,7 @@ impl<S: 'static> OwnedTasks<S> {
}
}
- /// Shut down all tasks in the collection. This call also closes the
+ /// Shuts down all tasks in the collection. This call also closes the
/// collection, preventing new items from being added.
pub(crate) fn close_and_shutdown_all(&self)
where
@@ -213,7 +213,7 @@ impl<S: 'static> LocalOwnedTasks<S> {
}
}
- /// Shut down all tasks in the collection. This call also closes the
+ /// Shuts down all tasks in the collection. This call also closes the
/// collection, preventing new items from being added.
pub(crate) fn close_and_shutdown_all(&self)
where
@@ -241,7 +241,7 @@ impl<S: 'static> LocalOwnedTasks<S> {
unsafe { inner.list.remove(task.header().into()) })
}
- /// Assert that the given task is owned by this LocalOwnedTasks and convert
+ /// Asserts that the given task is owned by this LocalOwnedTasks and convert
/// it to a LocalNotified, giving the thread permission to poll this task.
#[inline]
pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs
index 5f0d5ab..1f18209 100644
--- a/src/runtime/task/mod.rs
+++ b/src/runtime/task/mod.rs
@@ -173,7 +173,7 @@ use std::marker::PhantomData;
use std::ptr::NonNull;
use std::{fmt, mem};
-/// An owned handle to the task, tracked by ref count
+/// An owned handle to the task, tracked by ref count.
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
raw: RawTask,
@@ -211,7 +211,7 @@ pub(crate) struct UnownedTask<S: 'static> {
unsafe impl<S> Send for UnownedTask<S> {}
unsafe impl<S> Sync for UnownedTask<S> {}
-/// Task result sent back
+/// Task result sent back.
pub(crate) type Result<T> = std::result::Result<T, JoinError>;
pub(crate) trait Schedule: Sync + Sized + 'static {
@@ -260,7 +260,7 @@ cfg_rt! {
(task, notified, join)
}
- /// Create a new task with an associated join handle. This method is used
+ /// Creates a new task with an associated join handle. This method is used
/// only when the task is not going to be stored in an `OwnedTasks` list.
///
/// Currently only blocking tasks use this method.
@@ -327,7 +327,7 @@ cfg_rt_multi_thread! {
}
impl<S: Schedule> Task<S> {
- /// Pre-emptively cancel the task as part of the shutdown process.
+ /// Pre-emptively cancels the task as part of the shutdown process.
pub(crate) fn shutdown(self) {
let raw = self.raw;
mem::forget(self);
@@ -336,7 +336,7 @@ impl<S: Schedule> Task<S> {
}
impl<S: Schedule> LocalNotified<S> {
- /// Run the task
+ /// Runs the task.
pub(crate) fn run(self) {
let raw = self.task.raw;
mem::forget(self);
@@ -420,7 +420,7 @@ impl<S> fmt::Debug for Notified<S> {
/// # Safety
///
-/// Tasks are pinned
+/// Tasks are pinned.
unsafe impl<S> linked_list::Link for Task<S> {
type Handle = Task<S>;
type Target = Header;
diff --git a/src/runtime/task/raw.rs b/src/runtime/task/raw.rs
index 8c2c3f7..fbc9574 100644
--- a/src/runtime/task/raw.rs
+++ b/src/runtime/task/raw.rs
@@ -10,22 +10,22 @@ pub(super) struct RawTask {
}
pub(super) struct Vtable {
- /// Poll the future
+ /// Polls the future.
pub(super) poll: unsafe fn(NonNull<Header>),
- /// Deallocate the memory
+ /// Deallocates the memory.
pub(super) dealloc: unsafe fn(NonNull<Header>),
- /// Read the task output, if complete
+ /// Reads the task output, if complete.
pub(super) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker),
- /// The join handle has been dropped
+ /// The join handle has been dropped.
pub(super) drop_join_handle_slow: unsafe fn(NonNull<Header>),
- /// The task is remotely aborted
+ /// The task is remotely aborted.
pub(super) remote_abort: unsafe fn(NonNull<Header>),
- /// Scheduler is being shutdown
+ /// Scheduler is being shutdown.
pub(super) shutdown: unsafe fn(NonNull<Header>),
}
diff --git a/src/runtime/task/state.rs b/src/runtime/task/state.rs
index 059a7f9..c2d5b28 100644
--- a/src/runtime/task/state.rs
+++ b/src/runtime/task/state.rs
@@ -8,7 +8,7 @@ pub(super) struct State {
val: AtomicUsize,
}
-/// Current state value
+/// Current state value.
#[derive(Copy, Clone)]
pub(super) struct Snapshot(usize);
@@ -19,20 +19,20 @@ const RUNNING: usize = 0b0001;
/// The task is complete.
///
-/// Once this bit is set, it is never unset
+/// Once this bit is set, it is never unset.
const COMPLETE: usize = 0b0010;
-/// Extracts the task's lifecycle value from the state
+/// Extracts the task's lifecycle value from the state.
const LIFECYCLE_MASK: usize = 0b11;
/// Flag tracking if the task has been pushed into a run queue.
const NOTIFIED: usize = 0b100;
-/// The join handle is still around
+/// The join handle is still around.
#[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
const JOIN_INTEREST: usize = 0b1_000;
-/// A join handle waker has been set
+/// A join handle waker has been set.
#[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
const JOIN_WAKER: usize = 0b10_000;
@@ -40,19 +40,19 @@ const JOIN_WAKER: usize = 0b10_000;
#[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
const CANCELLED: usize = 0b100_000;
-/// All bits
+/// All bits.
const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
/// Bits used by the ref count portion of the state.
const REF_COUNT_MASK: usize = !STATE_MASK;
-/// Number of positions to shift the ref count
+/// Number of positions to shift the ref count.
const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
-/// One ref count
+/// One ref count.
const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
-/// State a task is initialized with
+/// State a task is initialized with.
///
/// A task is initialized with three references:
///
@@ -96,7 +96,7 @@ pub(super) enum TransitionToNotifiedByRef {
/// All transitions are performed via RMW operations. This establishes an
/// unambiguous modification order.
impl State {
- /// Return a task's initial state
+ /// Returns a task's initial state.
pub(super) fn new() -> State {
// The raw task returned by this method has a ref-count of three. See
// the comment on INITIAL_STATE for more.
@@ -110,7 +110,7 @@ impl State {
Snapshot(self.val.load(Acquire))
}
- /// Attempt to transition the lifecycle to `Running`. This sets the
+ /// Attempts to transition the lifecycle to `Running`. This sets the
/// notified bit to false so notifications during the poll can be detected.
pub(super) fn transition_to_running(&self) -> TransitionToRunning {
self.fetch_update_action(|mut next| {
@@ -190,7 +190,7 @@ impl State {
Snapshot(prev.0 ^ DELTA)
}
- /// Transition from `Complete` -> `Terminal`, decrementing the reference
+ /// Transitions from `Complete` -> `Terminal`, decrementing the reference
/// count the specified number of times.
///
/// Returns true if the task should be deallocated.
@@ -270,10 +270,10 @@ impl State {
})
}
- /// Set the cancelled bit and transition the state to `NOTIFIED` if idle.
+ /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
///
/// Returns `true` if the task needs to be submitted to the pool for
- /// execution
+ /// execution.
pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
self.fetch_update_action(|mut snapshot| {
if snapshot.is_cancelled() || snapshot.is_complete() {
@@ -306,7 +306,7 @@ impl State {
})
}
- /// Set the `CANCELLED` bit and attempt to transition to `Running`.
+ /// Sets the `CANCELLED` bit and attempts to transition to `Running`.
///
/// Returns `true` if the transition to `Running` succeeded.
pub(super) fn transition_to_shutdown(&self) -> bool {
@@ -330,7 +330,7 @@ impl State {
}
/// Optimistically tries to swap the state assuming the join handle is
- /// __immediately__ dropped on spawn
+ /// __immediately__ dropped on spawn.
pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
use std::sync::atomic::Ordering::Relaxed;
@@ -352,7 +352,7 @@ impl State {
.map_err(|_| ())
}
- /// Try to unset the JOIN_INTEREST flag.
+ /// Tries to unset the JOIN_INTEREST flag.
///
/// Returns `Ok` if the operation happens before the task transitions to a
/// completed state, `Err` otherwise.
@@ -371,7 +371,7 @@ impl State {
})
}
- /// Set the `JOIN_WAKER` bit.
+ /// Sets the `JOIN_WAKER` bit.
///
/// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
/// the task has completed.
diff --git a/src/runtime/tests/loom_basic_scheduler.rs b/src/runtime/tests/loom_basic_scheduler.rs
index e6221d3..d2894b9 100644
--- a/src/runtime/tests/loom_basic_scheduler.rs
+++ b/src/runtime/tests/loom_basic_scheduler.rs
@@ -63,6 +63,45 @@ fn block_on_num_polls() {
});
}
+#[test]
+fn assert_no_unnecessary_polls() {
+ loom::model(|| {
+ // // After we poll outer future, woken should reset to false
+ let rt = Builder::new_current_thread().build().unwrap();
+ let (tx, rx) = oneshot::channel();
+ let pending_cnt = Arc::new(AtomicUsize::new(0));
+
+ rt.spawn(async move {
+ for _ in 0..24 {
+ task::yield_now().await;
+ }
+ tx.send(()).unwrap();
+ });
+
+ let pending_cnt_clone = pending_cnt.clone();
+ rt.block_on(async move {
+ // use task::yield_now() to ensure woken set to true
+ // ResetFuture will be polled at most once
+ // Here comes two cases
+ // 1. recv no message from channel, ResetFuture will be polled
+ // but get Pending and we record ResetFuture.pending_cnt ++.
+ // Then when message arrive, ResetFuture returns Ready. So we
+ // expect ResetFuture.pending_cnt = 1
+ // 2. recv message from channel, ResetFuture returns Ready immediately.
+ // We expect ResetFuture.pending_cnt = 0
+ task::yield_now().await;
+ ResetFuture {
+ rx,
+ pending_cnt: pending_cnt_clone,
+ }
+ .await;
+ });
+
+ let pending_cnt = pending_cnt.load(Acquire);
+ assert!(pending_cnt <= 1);
+ });
+}
+
struct BlockedFuture {
rx: Receiver<()>,
num_polls: Arc<AtomicUsize>,
@@ -80,3 +119,22 @@ impl Future for BlockedFuture {
}
}
}
+
+struct ResetFuture {
+ rx: Receiver<()>,
+ pending_cnt: Arc<AtomicUsize>,
+}
+
+impl Future for ResetFuture {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match Pin::new(&mut self.rx).poll(cx) {
+ Poll::Pending => {
+ self.pending_cnt.fetch_add(1, Release);
+ Poll::Pending
+ }
+ _ => Poll::Ready(()),
+ }
+ }
+}
diff --git a/src/runtime/thread_pool/idle.rs b/src/runtime/thread_pool/idle.rs
index 2cac30e..6b7ee12 100644
--- a/src/runtime/thread_pool/idle.rs
+++ b/src/runtime/thread_pool/idle.rs
@@ -126,7 +126,7 @@ impl Idle {
}
}
- /// Returns `true` if `worker_id` is contained in the sleep set
+ /// Returns `true` if `worker_id` is contained in the sleep set.
pub(super) fn is_parked(&self, worker_id: usize) -> bool {
let sleepers = self.sleepers.lock();
sleepers.contains(&worker_id)
diff --git a/src/runtime/thread_pool/mod.rs b/src/runtime/thread_pool/mod.rs
index f2e68f6..82e34c7 100644
--- a/src/runtime/thread_pool/mod.rs
+++ b/src/runtime/thread_pool/mod.rs
@@ -24,7 +24,7 @@ pub(crate) struct ThreadPool {
spawner: Spawner,
}
-/// Submit futures to the associated thread pool for execution.
+/// Submits futures to the associated thread pool for execution.
///
/// A `Spawner` instance is a handle to a single thread pool that allows the owner
/// of the handle to spawn futures onto the thread pool.
diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs
index 44f8db8..ae8efe6 100644
--- a/src/runtime/thread_pool/worker.rs
+++ b/src/runtime/thread_pool/worker.rs
@@ -126,7 +126,7 @@ pub(super) struct Shared {
/// how they communicate between each other.
remotes: Box<[Remote]>,
- /// Submit work to the scheduler while **not** currently on a worker thread.
+ /// Submits work to the scheduler while **not** currently on a worker thread.
inject: Inject<Arc<Shared>>,
/// Coordinates idle workers
@@ -147,13 +147,13 @@ pub(super) struct Shared {
/// Callback for a worker unparking itself
after_unpark: Option<Callback>,
- /// Collect stats from the runtime.
+ /// Collects stats from the runtime.
stats: RuntimeStats,
}
/// Used to communicate with a worker from other threads.
struct Remote {
- /// Steal tasks from this worker.
+ /// Steals tasks from this worker.
steal: queue::Steal<Arc<Shared>>,
/// Unparks the associated worker thread
@@ -587,9 +587,9 @@ impl Core {
worker.shared.transition_worker_from_searching();
}
- /// Prepare the worker state for parking
+ /// Prepares the worker state for parking.
///
- /// Returns true if the transition happend, false if there is work to do first
+ /// Returns true if the transition happend, false if there is work to do first.
fn transition_to_parked(&mut self, worker: &Worker) -> bool {
// Workers should not park if they have work to do
if self.lifo_slot.is_some() || self.run_queue.has_tasks() {
@@ -653,7 +653,7 @@ impl Core {
self.stats.submit(&worker.shared.stats);
}
- /// Shutdown the core
+ /// Shuts down the core.
fn shutdown(&mut self) {
// Take the core
let mut park = self.park.take().expect("park missing");
@@ -666,7 +666,7 @@ impl Core {
}
impl Worker {
- /// Returns a reference to the scheduler's injection queue
+ /// Returns a reference to the scheduler's injection queue.
fn inject(&self) -> &Inject<Arc<Shared>> {
&self.shared.inject
}
diff --git a/src/signal/ctrl_c.rs b/src/signal/ctrl_c.rs
index 1eeeb85..b26ab7e 100644
--- a/src/signal/ctrl_c.rs
+++ b/src/signal/ctrl_c.rs
@@ -47,6 +47,15 @@ use std::io;
/// println!("received ctrl-c event");
/// }
/// ```
+///
+/// Listen in the background:
+///
+/// ```rust,no_run
+/// tokio::spawn(async move {
+/// tokio::signal::ctrl_c().await.unwrap();
+/// // Your handler here
+/// });
+/// ```
pub async fn ctrl_c() -> io::Result<()> {
os_impl::ctrl_c()?.recv().await;
Ok(())
diff --git a/src/signal/mod.rs b/src/signal/mod.rs
index fe572f0..882218a 100644
--- a/src/signal/mod.rs
+++ b/src/signal/mod.rs
@@ -1,4 +1,4 @@
-//! Asynchronous signal handling for Tokio
+//! Asynchronous signal handling for Tokio.
//!
//! Note that signal handling is in general a very tricky topic and should be
//! used with great care. This crate attempts to implement 'best practice' for
diff --git a/src/signal/reusable_box.rs b/src/signal/reusable_box.rs
index 426ecb0..796fa21 100644
--- a/src/signal/reusable_box.rs
+++ b/src/signal/reusable_box.rs
@@ -30,7 +30,7 @@ impl<T> ReusableBoxFuture<T> {
Self { boxed }
}
- /// Replace the future currently stored in this box.
+ /// Replaces the future currently stored in this box.
///
/// This reallocates if and only if the layout of the provided future is
/// different from the layout of the currently stored future.
@@ -43,7 +43,7 @@ impl<T> ReusableBoxFuture<T> {
}
}
- /// Replace the future currently stored in this box.
+ /// Replaces the future currently stored in this box.
///
/// This function never reallocates, but returns an error if the provided
/// future has a different size or alignment from the currently stored
@@ -70,7 +70,7 @@ impl<T> ReusableBoxFuture<T> {
}
}
- /// Set the current future.
+ /// Sets the current future.
///
/// # Safety
///
@@ -103,14 +103,14 @@ impl<T> ReusableBoxFuture<T> {
}
}
- /// Get a pinned reference to the underlying future.
+ /// Gets a pinned reference to the underlying future.
pub(crate) fn get_pin(&mut self) -> Pin<&mut (dyn Future<Output = T> + Send)> {
// SAFETY: The user of this box cannot move the box, and we do not move it
// either.
unsafe { Pin::new_unchecked(self.boxed.as_mut()) }
}
- /// Poll the future stored inside this box.
+ /// Polls the future stored inside this box.
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<T> {
self.get_pin().poll(cx)
}
@@ -119,7 +119,7 @@ impl<T> ReusableBoxFuture<T> {
impl<T> Future for ReusableBoxFuture<T> {
type Output = T;
- /// Poll the future stored inside this box.
+ /// Polls the future stored inside this box.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
Pin::into_inner(self).get_pin().poll(cx)
}
diff --git a/src/sync/batch_semaphore.rs b/src/sync/batch_semaphore.rs
index 9b43404..b5c39d2 100644
--- a/src/sync/batch_semaphore.rs
+++ b/src/sync/batch_semaphore.rs
@@ -1,5 +1,5 @@
#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
-//! # Implementation Details
+//! # Implementation Details.
//!
//! The semaphore is implemented using an intrusive linked list of waiters. An
//! atomic counter tracks the number of available permits. If the semaphore does
@@ -138,7 +138,7 @@ impl Semaphore {
}
}
- /// Creates a new semaphore with the initial number of permits
+ /// Creates a new semaphore with the initial number of permits.
///
/// Maximum number of permits on 32-bit platforms is `1<<29`.
///
@@ -159,7 +159,7 @@ impl Semaphore {
}
}
- /// Returns the current number of available permits
+ /// Returns the current number of available permits.
pub(crate) fn available_permits(&self) -> usize {
self.permits.load(Acquire) >> Self::PERMIT_SHIFT
}
@@ -197,7 +197,7 @@ impl Semaphore {
}
}
- /// Returns true if the semaphore is closed
+ /// Returns true if the semaphore is closed.
pub(crate) fn is_closed(&self) -> bool {
self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
}
diff --git a/src/sync/broadcast.rs b/src/sync/broadcast.rs
index a2ca445..0d9cd3b 100644
--- a/src/sync/broadcast.rs
+++ b/src/sync/broadcast.rs
@@ -293,37 +293,37 @@ pub mod error {
use self::error::*;
-/// Data shared between senders and receivers
+/// Data shared between senders and receivers.
struct Shared<T> {
- /// slots in the channel
+ /// slots in the channel.
buffer: Box<[RwLock<Slot<T>>]>,
- /// Mask a position -> index
+ /// Mask a position -> index.
mask: usize,
/// Tail of the queue. Includes the rx wait list.
tail: Mutex<Tail>,
- /// Number of outstanding Sender handles
+ /// Number of outstanding Sender handles.
num_tx: AtomicUsize,
}
-/// Next position to write a value
+/// Next position to write a value.
struct Tail {
- /// Next position to write to
+ /// Next position to write to.
pos: u64,
- /// Number of active receivers
+ /// Number of active receivers.
rx_cnt: usize,
- /// True if the channel is closed
+ /// True if the channel is closed.
closed: bool,
- /// Receivers waiting for a value
+ /// Receivers waiting for a value.
waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
}
-/// Slot in the buffer
+/// Slot in the buffer.
struct Slot<T> {
/// Remaining number of receivers that are expected to see this value.
///
@@ -333,7 +333,7 @@ struct Slot<T> {
/// acquired.
rem: AtomicUsize,
- /// Uniquely identifies the `send` stored in the slot
+ /// Uniquely identifies the `send` stored in the slot.
pos: u64,
/// True signals the channel is closed.
@@ -346,9 +346,9 @@ struct Slot<T> {
val: UnsafeCell<Option<T>>,
}
-/// An entry in the wait queue
+/// An entry in the wait queue.
struct Waiter {
- /// True if queued
+ /// True if queued.
queued: bool,
/// Task waiting on the broadcast channel.
@@ -365,12 +365,12 @@ struct RecvGuard<'a, T> {
slot: RwLockReadGuard<'a, Slot<T>>,
}
-/// Receive a value future
+/// Receive a value future.
struct Recv<'a, T> {
- /// Receiver being waited on
+ /// Receiver being waited on.
receiver: &'a mut Receiver<T>,
- /// Entry in the waiter `LinkedList`
+ /// Entry in the waiter `LinkedList`.
waiter: UnsafeCell<Waiter>,
}
diff --git a/src/sync/mpsc/block.rs b/src/sync/mpsc/block.rs
index 6e7b700..58f4a9f 100644
--- a/src/sync/mpsc/block.rs
+++ b/src/sync/mpsc/block.rs
@@ -40,7 +40,7 @@ struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
use super::BLOCK_CAP;
-/// Masks an index to get the block identifier
+/// Masks an index to get the block identifier.
const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
/// Masks an index to get the value offset in a block.
@@ -89,7 +89,7 @@ impl<T> Block<T> {
}
}
- /// Returns `true` if the block matches the given index
+ /// Returns `true` if the block matches the given index.
pub(crate) fn is_at_index(&self, index: usize) -> bool {
debug_assert!(offset(index) == 0);
self.start_index == index
diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs
index bcad84d..5a2bfa6 100644
--- a/src/sync/mpsc/bounded.rs
+++ b/src/sync/mpsc/bounded.rs
@@ -10,7 +10,7 @@ cfg_time! {
use std::fmt;
use std::task::{Context, Poll};
-/// Send values to the associated `Receiver`.
+/// Sends values to the associated `Receiver`.
///
/// Instances are created by the [`channel`](channel) function.
///
@@ -22,7 +22,7 @@ pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
}
-/// Permit to send one value into the channel.
+/// Permits to send one value into the channel.
///
/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
/// and are used to guarantee channel capacity before generating a message to send.
@@ -49,7 +49,7 @@ pub struct OwnedPermit<T> {
chan: Option<chan::Tx<T, Semaphore>>,
}
-/// Receive values from the associated `Sender`.
+/// Receives values from the associated `Sender`.
///
/// Instances are created by the [`channel`](channel) function.
///
@@ -57,7 +57,7 @@ pub struct OwnedPermit<T> {
///
/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
pub struct Receiver<T> {
- /// The channel receiver
+ /// The channel receiver.
chan: chan::Rx<T, Semaphore>,
}
@@ -187,7 +187,7 @@ impl<T> Receiver<T> {
poll_fn(|cx| self.chan.recv(cx)).await
}
- /// Try to receive the next value for this receiver.
+ /// Tries to receive the next value for this receiver.
///
/// This method returns the [`Empty`] error if the channel is currently
/// empty, but there are still outstanding [senders] or [permits].
@@ -672,7 +672,7 @@ impl<T> Sender<T> {
self.chan.is_closed()
}
- /// Wait for channel capacity. Once capacity to send one message is
+ /// Waits for channel capacity. Once capacity to send one message is
/// available, it is reserved for the caller.
///
/// If the channel is full, the function waits for the number of unreceived
@@ -721,7 +721,7 @@ impl<T> Sender<T> {
Ok(Permit { chan: &self.chan })
}
- /// Wait for channel capacity, moving the `Sender` and returning an owned
+ /// Waits for channel capacity, moving the `Sender` and returning an owned
/// permit. Once capacity to send one message is available, it is reserved
/// for the caller.
///
@@ -815,7 +815,7 @@ impl<T> Sender<T> {
}
}
- /// Try to acquire a slot in the channel without waiting for the slot to become
+ /// Tries to acquire a slot in the channel without waiting for the slot to become
/// available.
///
/// If the channel is full this function will return [`TrySendError`], otherwise
@@ -868,7 +868,7 @@ impl<T> Sender<T> {
Ok(Permit { chan: &self.chan })
}
- /// Try to acquire a slot in the channel without waiting for the slot to become
+ /// Tries to acquire a slot in the channel without waiting for the slot to become
/// available, returning an owned permit.
///
/// This moves the sender _by value_, and returns an owned permit that can
@@ -1117,7 +1117,7 @@ impl<T> OwnedPermit<T> {
Sender { chan }
}
- /// Release the reserved capacity *without* sending a message, returning the
+ /// Releases the reserved capacity *without* sending a message, returning the
/// [`Sender`].
///
/// # Examples
diff --git a/src/sync/mpsc/chan.rs b/src/sync/mpsc/chan.rs
index 637ae1f..c3007de 100644
--- a/src/sync/mpsc/chan.rs
+++ b/src/sync/mpsc/chan.rs
@@ -14,7 +14,7 @@ use std::sync::atomic::Ordering::{AcqRel, Relaxed};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
-/// Channel sender
+/// Channel sender.
pub(crate) struct Tx<T, S> {
inner: Arc<Chan<T, S>>,
}
@@ -25,7 +25,7 @@ impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
}
}
-/// Channel receiver
+/// Channel receiver.
pub(crate) struct Rx<T, S: Semaphore> {
inner: Arc<Chan<T, S>>,
}
@@ -47,7 +47,7 @@ pub(crate) trait Semaphore {
}
struct Chan<T, S> {
- /// Notifies all tasks listening for the receiver being dropped
+ /// Notifies all tasks listening for the receiver being dropped.
notify_rx_closed: Notify,
/// Handle to the push half of the lock-free list.
diff --git a/src/sync/mpsc/error.rs b/src/sync/mpsc/error.rs
index 48ca379..b7b9cf7 100644
--- a/src/sync/mpsc/error.rs
+++ b/src/sync/mpsc/error.rs
@@ -1,4 +1,4 @@
-//! Channel error types
+//! Channel error types.
use std::error::Error;
use std::fmt;
diff --git a/src/sync/mpsc/list.rs b/src/sync/mpsc/list.rs
index 53c34d2..e4eeb45 100644
--- a/src/sync/mpsc/list.rs
+++ b/src/sync/mpsc/list.rs
@@ -8,7 +8,7 @@ use std::fmt;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
-/// List queue transmit handle
+/// List queue transmit handle.
pub(crate) struct Tx<T> {
/// Tail in the `Block` mpmc list.
block_tail: AtomicPtr<Block<T>>,
@@ -79,7 +79,7 @@ impl<T> Tx<T> {
}
}
- /// Closes the send half of the list
+ /// Closes the send half of the list.
///
/// Similar process as pushing a value, but instead of writing the value &
/// setting the ready flag, the TX_CLOSED flag is set on the block.
diff --git a/src/sync/mpsc/unbounded.rs b/src/sync/mpsc/unbounded.rs
index 8961930..b133f9f 100644
--- a/src/sync/mpsc/unbounded.rs
+++ b/src/sync/mpsc/unbounded.rs
@@ -129,7 +129,7 @@ impl<T> UnboundedReceiver<T> {
poll_fn(|cx| self.poll_recv(cx)).await
}
- /// Try to receive the next value for this receiver.
+ /// Tries to receive the next value for this receiver.
///
/// This method returns the [`Empty`] error if the channel is currently
/// empty, but there are still outstanding [senders] or [permits].
diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs
index 6acd28b..4d9f988 100644
--- a/src/sync/mutex.rs
+++ b/src/sync/mutex.rs
@@ -301,6 +301,40 @@ impl<T: ?Sized> Mutex<T> {
MutexGuard { lock: self }
}
+ /// Blocking lock this mutex. When the lock has been acquired, function returns a
+ /// [`MutexGuard`].
+ ///
+ /// This method is intended for use cases where you
+ /// need to use this mutex in asynchronous code as well as in synchronous code.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::Mutex;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mutex = Arc::new(Mutex::new(1));
+ ///
+ /// let mutex1 = Arc::clone(&mutex);
+ /// let sync_code = tokio::task::spawn_blocking(move || {
+ /// let mut n = mutex1.blocking_lock();
+ /// *n = 2;
+ /// });
+ ///
+ /// sync_code.await.unwrap();
+ ///
+ /// let n = mutex.lock().await;
+ /// assert_eq!(*n, 2);
+ /// }
+ ///
+ /// ```
+ #[cfg(feature = "sync")]
+ pub fn blocking_lock(&self) -> MutexGuard<'_, T> {
+ crate::future::block_on(self.lock())
+ }
+
/// Locks this mutex, causing the current task to yield until the lock has
/// been acquired. When the lock has been acquired, this returns an
/// [`OwnedMutexGuard`].
@@ -462,14 +496,14 @@ where
}
}
-impl<T> std::fmt::Debug for Mutex<T>
+impl<T: ?Sized> std::fmt::Debug for Mutex<T>
where
T: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("Mutex");
match self.try_lock() {
- Ok(inner) => d.field("data", &*inner),
+ Ok(inner) => d.field("data", &&*inner),
Err(_) => d.field("data", &format_args!("<locked>")),
};
d.finish()
diff --git a/src/sync/notify.rs b/src/sync/notify.rs
index 74b97cc..c93ce3b 100644
--- a/src/sync/notify.rs
+++ b/src/sync/notify.rs
@@ -20,7 +20,7 @@ use std::task::{Context, Poll, Waker};
type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
-/// Notify a single task to wake up.
+/// Notifies a single task to wake up.
///
/// `Notify` provides a basic mechanism to notify a single task of an event.
/// `Notify` itself does not carry any data. Instead, it is to be used to signal
@@ -57,13 +57,16 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// let notify = Arc::new(Notify::new());
/// let notify2 = notify.clone();
///
-/// tokio::spawn(async move {
+/// let handle = tokio::spawn(async move {
/// notify2.notified().await;
/// println!("received notification");
/// });
///
/// println!("sending notification");
/// notify.notify_one();
+///
+/// // Wait for task to receive notification.
+/// handle.await.unwrap();
/// }
/// ```
///
@@ -128,10 +131,10 @@ enum NotificationType {
#[derive(Debug)]
struct Waiter {
- /// Intrusive linked-list pointers
+ /// Intrusive linked-list pointers.
pointers: linked_list::Pointers<Waiter>,
- /// Waiting task's waker
+ /// Waiting task's waker.
waker: Option<Waker>,
/// `true` if the notification has been assigned to this waiter.
@@ -168,13 +171,13 @@ const NOTIFY_WAITERS_SHIFT: usize = 2;
const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
-/// Initial "idle" state
+/// Initial "idle" state.
const EMPTY: usize = 0;
/// One or more threads are currently waiting to be notified.
const WAITING: usize = 1;
-/// Pending notification
+/// Pending notification.
const NOTIFIED: usize = 2;
fn set_state(data: usize, state: usize) -> usize {
@@ -289,7 +292,7 @@ impl Notify {
}
}
- /// Notifies a waiting task
+ /// Notifies a waiting task.
///
/// If a task is currently waiting, that task is notified. Otherwise, a
/// permit is stored in this `Notify` value and the **next** call to
@@ -359,7 +362,7 @@ impl Notify {
}
}
- /// Notifies all waiting tasks
+ /// Notifies all waiting tasks.
///
/// If a task is currently waiting, that task is notified. Unlike with
/// `notify_one()`, no permit is stored to be used by the next call to
@@ -551,6 +554,10 @@ impl Future for Notified<'_> {
return Poll::Ready(());
}
+ // Clone the waker before locking, a waker clone can be
+ // triggering arbitrary code.
+ let waker = cx.waker().clone();
+
// Acquire the lock and attempt to transition to the waiting
// state.
let mut waiters = notify.waiters.lock();
@@ -612,7 +619,7 @@ impl Future for Notified<'_> {
// Safety: called while locked.
unsafe {
- (*waiter.get()).waker = Some(cx.waker().clone());
+ (*waiter.get()).waker = Some(waker);
}
// Insert the waiter into the linked list
diff --git a/src/sync/once_cell.rs b/src/sync/once_cell.rs
index 91705a5..d31a40e 100644
--- a/src/sync/once_cell.rs
+++ b/src/sync/once_cell.rs
@@ -245,7 +245,7 @@ impl<T> OnceCell<T> {
}
}
- /// Set the value of the `OnceCell` to the given value if the `OnceCell` is
+ /// Sets the value of the `OnceCell` to the given value if the `OnceCell` is
/// empty.
///
/// If the `OnceCell` already has a value, this call will fail with an
@@ -283,7 +283,7 @@ impl<T> OnceCell<T> {
}
}
- /// Get the value currently in the `OnceCell`, or initialize it with the
+ /// Gets the value currently in the `OnceCell`, or initialize it with the
/// given asynchronous operation.
///
/// If some other task is currently working on initializing the `OnceCell`,
@@ -331,7 +331,7 @@ impl<T> OnceCell<T> {
}
}
- /// Get the value currently in the `OnceCell`, or initialize it with the
+ /// Gets the value currently in the `OnceCell`, or initialize it with the
/// given asynchronous operation.
///
/// If some other task is currently working on initializing the `OnceCell`,
@@ -382,7 +382,7 @@ impl<T> OnceCell<T> {
}
}
- /// Take the value from the cell, destroying the cell in the process.
+ /// Takes the value from the cell, destroying the cell in the process.
/// Returns `None` if the cell is empty.
pub fn into_inner(mut self) -> Option<T> {
if self.initialized_mut() {
diff --git a/src/sync/oneshot.rs b/src/sync/oneshot.rs
index 0df6037..4fb22ec 100644
--- a/src/sync/oneshot.rs
+++ b/src/sync/oneshot.rs
@@ -51,6 +51,70 @@
//! }
//! }
//! ```
+//!
+//! To use a oneshot channel in a `tokio::select!` loop, add `&mut` in front of
+//! the channel.
+//!
+//! ```
+//! use tokio::sync::oneshot;
+//! use tokio::time::{interval, sleep, Duration};
+//!
+//! #[tokio::main]
+//! # async fn _doc() {}
+//! # #[tokio::main(flavor = "current_thread", start_paused = true)]
+//! async fn main() {
+//! let (send, mut recv) = oneshot::channel();
+//! let mut interval = interval(Duration::from_millis(100));
+//!
+//! # let handle =
+//! tokio::spawn(async move {
+//! sleep(Duration::from_secs(1)).await;
+//! send.send("shut down").unwrap();
+//! });
+//!
+//! loop {
+//! tokio::select! {
+//! _ = interval.tick() => println!("Another 100ms"),
+//! msg = &mut recv => {
+//! println!("Got message: {}", msg.unwrap());
+//! break;
+//! }
+//! }
+//! }
+//! # handle.await.unwrap();
+//! }
+//! ```
+//!
+//! To use a `Sender` from a destructor, put it in an [`Option`] and call
+//! [`Option::take`].
+//!
+//! ```
+//! use tokio::sync::oneshot;
+//!
+//! struct SendOnDrop {
+//! sender: Option<oneshot::Sender<&'static str>>,
+//! }
+//! impl Drop for SendOnDrop {
+//! fn drop(&mut self) {
+//! if let Some(sender) = self.sender.take() {
+//! // Using `let _ =` to ignore send errors.
+//! let _ = sender.send("I got dropped!");
+//! }
+//! }
+//! }
+//!
+//! #[tokio::main]
+//! # async fn _doc() {}
+//! # #[tokio::main(flavor = "current_thread")]
+//! async fn main() {
+//! let (send, recv) = oneshot::channel();
+//!
+//! let send_on_drop = SendOnDrop { sender: Some(send) };
+//! drop(send_on_drop);
+//!
+//! assert_eq!(recv.await, Ok("I got dropped!"));
+//! }
+//! ```
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
@@ -68,16 +132,98 @@ use std::task::{Context, Poll, Waker};
///
/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
/// [`channel`](fn@channel) function.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::sync::oneshot;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx, rx) = oneshot::channel();
+///
+/// tokio::spawn(async move {
+/// if let Err(_) = tx.send(3) {
+/// println!("the receiver dropped");
+/// }
+/// });
+///
+/// match rx.await {
+/// Ok(v) => println!("got = {:?}", v),
+/// Err(_) => println!("the sender dropped"),
+/// }
+/// }
+/// ```
+///
+/// If the sender is dropped without sending, the receiver will fail with
+/// [`error::RecvError`]:
+///
+/// ```
+/// use tokio::sync::oneshot;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx, rx) = oneshot::channel::<u32>();
+///
+/// tokio::spawn(async move {
+/// drop(tx);
+/// });
+///
+/// match rx.await {
+/// Ok(_) => panic!("This doesn't happen"),
+/// Err(_) => println!("the sender dropped"),
+/// }
+/// }
+/// ```
+///
+/// To use a `Sender` from a destructor, put it in an [`Option`] and call
+/// [`Option::take`].
+///
+/// ```
+/// use tokio::sync::oneshot;
+///
+/// struct SendOnDrop {
+/// sender: Option<oneshot::Sender<&'static str>>,
+/// }
+/// impl Drop for SendOnDrop {
+/// fn drop(&mut self) {
+/// if let Some(sender) = self.sender.take() {
+/// // Using `let _ =` to ignore send errors.
+/// let _ = sender.send("I got dropped!");
+/// }
+/// }
+/// }
+///
+/// #[tokio::main]
+/// # async fn _doc() {}
+/// # #[tokio::main(flavor = "current_thread")]
+/// async fn main() {
+/// let (send, recv) = oneshot::channel();
+///
+/// let send_on_drop = SendOnDrop { sender: Some(send) };
+/// drop(send_on_drop);
+///
+/// assert_eq!(recv.await, Ok("I got dropped!"));
+/// }
+/// ```
+///
+/// [`Option`]: std::option::Option
+/// [`Option::take`]: std::option::Option::take
#[derive(Debug)]
pub struct Sender<T> {
inner: Option<Arc<Inner<T>>>,
}
-/// Receive a value from the associated [`Sender`].
+/// Receives a value from the associated [`Sender`].
///
/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
/// [`channel`](fn@channel) function.
///
+/// This channel has no `recv` method because the receiver itself implements the
+/// [`Future`] trait. To receive a value, `.await` the `Receiver` object directly.
+///
+/// [`Future`]: trait@std::future::Future
+///
/// # Examples
///
/// ```
@@ -120,13 +266,46 @@ pub struct Sender<T> {
/// }
/// }
/// ```
+///
+/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the
+/// channel.
+///
+/// ```
+/// use tokio::sync::oneshot;
+/// use tokio::time::{interval, sleep, Duration};
+///
+/// #[tokio::main]
+/// # async fn _doc() {}
+/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+/// async fn main() {
+/// let (send, mut recv) = oneshot::channel();
+/// let mut interval = interval(Duration::from_millis(100));
+///
+/// # let handle =
+/// tokio::spawn(async move {
+/// sleep(Duration::from_secs(1)).await;
+/// send.send("shut down").unwrap();
+/// });
+///
+/// loop {
+/// tokio::select! {
+/// _ = interval.tick() => println!("Another 100ms"),
+/// msg = &mut recv => {
+/// println!("Got message: {}", msg.unwrap());
+/// break;
+/// }
+/// }
+/// }
+/// # handle.await.unwrap();
+/// }
+/// ```
#[derive(Debug)]
pub struct Receiver<T> {
inner: Option<Arc<Inner<T>>>,
}
pub mod error {
- //! Oneshot error types
+ //! Oneshot error types.
use std::fmt;
@@ -171,7 +350,7 @@ pub mod error {
use self::error::*;
struct Inner<T> {
- /// Manages the state of the inner cell
+ /// Manages the state of the inner cell.
state: AtomicUsize,
/// The value. This is set by `Sender` and read by `Receiver`. The state of
@@ -179,9 +358,19 @@ struct Inner<T> {
value: UnsafeCell<Option<T>>,
/// The task to notify when the receiver drops without consuming the value.
+ ///
+ /// ## Safety
+ ///
+ /// The `TX_TASK_SET` bit in the `state` field is set if this field is
+ /// initialized. If that bit is unset, this field may be uninitialized.
tx_task: Task,
/// The task to notify when the value is sent.
+ ///
+ /// ## Safety
+ ///
+ /// The `RX_TASK_SET` bit in the `state` field is set if this field is
+ /// initialized. If that bit is unset, this field may be uninitialized.
rx_task: Task,
}
@@ -220,7 +409,7 @@ impl Task {
#[derive(Clone, Copy)]
struct State(usize);
-/// Create a new one-shot channel for sending single values across asynchronous
+/// Creates a new one-shot channel for sending single values across asynchronous
/// tasks.
///
/// The function returns separate "send" and "receive" handles. The `Sender`
@@ -311,11 +500,24 @@ impl<T> Sender<T> {
let inner = self.inner.take().unwrap();
inner.value.with_mut(|ptr| unsafe {
+ // SAFETY: The receiver will not access the `UnsafeCell` unless the
+ // channel has been marked as "complete" (the `VALUE_SENT` state bit
+ // is set).
+ // That bit is only set by the sender later on in this method, and
+ // calling this method consumes `self`. Therefore, if it was possible to
+ // call this method, we know that the `VALUE_SENT` bit is unset, and
+ // the receiver is not currently accessing the `UnsafeCell`.
*ptr = Some(t);
});
if !inner.complete() {
unsafe {
+ // SAFETY: The receiver will not access the `UnsafeCell` unless
+ // the channel has been marked as "complete". Calling
+ // `complete()` will return true if this bit is set, and false
+ // if it is not set. Thus, if `complete()` returned false, it is
+ // safe for us to access the value, because we know that the
+ // receiver will not.
return Err(inner.consume_value().unwrap());
}
}
@@ -430,7 +632,7 @@ impl<T> Sender<T> {
state.is_closed()
}
- /// Check whether the oneshot channel has been closed, and if not, schedules the
+ /// Checks whether the oneshot channel has been closed, and if not, schedules the
/// `Waker` in the provided `Context` to receive a notification when the channel is
/// closed.
///
@@ -661,6 +863,11 @@ impl<T> Receiver<T> {
let state = State::load(&inner.state, Acquire);
if state.is_complete() {
+ // SAFETY: If `state.is_complete()` returns true, then the
+ // `VALUE_SENT` bit has been set and the sender side of the
+ // channel will no longer attempt to access the inner
+ // `UnsafeCell`. Therefore, it is now safe for us to access the
+ // cell.
match unsafe { inner.consume_value() } {
Some(value) => Ok(value),
None => Err(TryRecvError::Closed),
@@ -751,6 +958,11 @@ impl<T> Inner<T> {
State::set_rx_task(&self.state);
coop.made_progress();
+ // SAFETY: If `state.is_complete()` returns true, then the
+ // `VALUE_SENT` bit has been set and the sender side of the
+ // channel will no longer attempt to access the inner
+ // `UnsafeCell`. Therefore, it is now safe for us to access the
+ // cell.
return match unsafe { self.consume_value() } {
Some(value) => Ready(Ok(value)),
None => Ready(Err(RecvError(()))),
@@ -797,6 +1009,14 @@ impl<T> Inner<T> {
}
/// Consumes the value. This function does not check `state`.
+ ///
+ /// # Safety
+ ///
+ /// Calling this method concurrently on multiple threads will result in a
+ /// data race. The `VALUE_SENT` state bit is used to ensure that only the
+ /// sender *or* the receiver will call this method at a given point in time.
+ /// If `VALUE_SENT` is not set, then only the sender may call this method;
+ /// if it is set, then only the receiver may call this method.
unsafe fn consume_value(&self) -> Option<T> {
self.value.with_mut(|ptr| (*ptr).take())
}
@@ -837,9 +1057,28 @@ impl<T: fmt::Debug> fmt::Debug for Inner<T> {
}
}
+/// Indicates that a waker for the receiving task has been set.
+///
+/// # Safety
+///
+/// If this bit is not set, the `rx_task` field may be uninitialized.
const RX_TASK_SET: usize = 0b00001;
+/// Indicates that a value has been stored in the channel's inner `UnsafeCell`.
+///
+/// # Safety
+///
+/// This bit controls which side of the channel is permitted to access the
+/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the
+/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by
+/// the sender.
const VALUE_SENT: usize = 0b00010;
const CLOSED: usize = 0b00100;
+
+/// Indicates that a waker for the sending task has been set.
+///
+/// # Safety
+///
+/// If this bit is not set, the `tx_task` field may be uninitialized.
const TX_TASK_SET: usize = 0b01000;
impl State {
@@ -852,11 +1091,38 @@ impl State {
}
fn set_complete(cell: &AtomicUsize) -> State {
- // TODO: This could be `Release`, followed by an `Acquire` fence *if*
- // the `RX_TASK_SET` flag is set. However, `loom` does not support
- // fences yet.
- let val = cell.fetch_or(VALUE_SENT, AcqRel);
- State(val)
+ // This method is a compare-and-swap loop rather than a fetch-or like
+ // other `set_$WHATEVER` methods on `State`. This is because we must
+ // check if the state has been closed before setting the `VALUE_SENT`
+ // bit.
+ //
+ // We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
+ // bit is already set, because `VALUE_SENT` will tell the receiver that
+ // it's okay to access the inner `UnsafeCell`. Immediately after calling
+ // `set_complete`, if the channel was closed, the sender will _also_
+ // access the `UnsafeCell` to take the value back out, so if a
+ // `poll_recv` or `try_recv` call is occurring concurrently, both
+ // threads may try to access the `UnsafeCell` if we were to set the
+ // `VALUE_SENT` bit on a closed channel.
+ let mut state = cell.load(Ordering::Relaxed);
+ loop {
+ if State(state).is_closed() {
+ break;
+ }
+ // TODO: This could be `Release`, followed by an `Acquire` fence *if*
+ // the `RX_TASK_SET` flag is set. However, `loom` does not support
+ // fences yet.
+ match cell.compare_exchange_weak(
+ state,
+ state | VALUE_SENT,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => break,
+ Err(actual) => state = actual,
+ }
+ }
+ State(state)
}
fn is_rx_task_set(self) -> bool {
diff --git a/src/sync/rwlock/owned_read_guard.rs b/src/sync/rwlock/owned_read_guard.rs
index b7f3926..1881295 100644
--- a/src/sync/rwlock/owned_read_guard.rs
+++ b/src/sync/rwlock/owned_read_guard.rs
@@ -22,7 +22,7 @@ pub struct OwnedRwLockReadGuard<T: ?Sized, U: ?Sized = T> {
}
impl<T: ?Sized, U: ?Sized> OwnedRwLockReadGuard<T, U> {
- /// Make a new `OwnedRwLockReadGuard` for a component of the locked data.
+ /// Makes a new `OwnedRwLockReadGuard` for a component of the locked data.
/// This operation cannot fail as the `OwnedRwLockReadGuard` passed in
/// already locked the data.
///
diff --git a/src/sync/rwlock/owned_write_guard.rs b/src/sync/rwlock/owned_write_guard.rs
index 91b6595..0a78d28 100644
--- a/src/sync/rwlock/owned_write_guard.rs
+++ b/src/sync/rwlock/owned_write_guard.rs
@@ -24,7 +24,7 @@ pub struct OwnedRwLockWriteGuard<T: ?Sized> {
}
impl<T: ?Sized> OwnedRwLockWriteGuard<T> {
- /// Make a new [`OwnedRwLockMappedWriteGuard`] for a component of the locked
+ /// Makes a new [`OwnedRwLockMappedWriteGuard`] for a component of the locked
/// data.
///
/// This operation cannot fail as the `OwnedRwLockWriteGuard` passed in
diff --git a/src/sync/rwlock/owned_write_guard_mapped.rs b/src/sync/rwlock/owned_write_guard_mapped.rs
index 6453236..d88ee01 100644
--- a/src/sync/rwlock/owned_write_guard_mapped.rs
+++ b/src/sync/rwlock/owned_write_guard_mapped.rs
@@ -23,7 +23,7 @@ pub struct OwnedRwLockMappedWriteGuard<T: ?Sized, U: ?Sized = T> {
}
impl<T: ?Sized, U: ?Sized> OwnedRwLockMappedWriteGuard<T, U> {
- /// Make a new `OwnedRwLockMappedWriteGuard` for a component of the locked
+ /// Makes a new `OwnedRwLockMappedWriteGuard` for a component of the locked
/// data.
///
/// This operation cannot fail as the `OwnedRwLockMappedWriteGuard` passed
diff --git a/src/sync/rwlock/read_guard.rs b/src/sync/rwlock/read_guard.rs
index 38eec77..090b297 100644
--- a/src/sync/rwlock/read_guard.rs
+++ b/src/sync/rwlock/read_guard.rs
@@ -19,7 +19,7 @@ pub struct RwLockReadGuard<'a, T: ?Sized> {
}
impl<'a, T: ?Sized> RwLockReadGuard<'a, T> {
- /// Make a new `RwLockReadGuard` for a component of the locked data.
+ /// Makes a new `RwLockReadGuard` for a component of the locked data.
///
/// This operation cannot fail as the `RwLockReadGuard` passed in already
/// locked the data.
diff --git a/src/sync/rwlock/write_guard.rs b/src/sync/rwlock/write_guard.rs
index 865a121..8c80ee7 100644
--- a/src/sync/rwlock/write_guard.rs
+++ b/src/sync/rwlock/write_guard.rs
@@ -22,7 +22,7 @@ pub struct RwLockWriteGuard<'a, T: ?Sized> {
}
impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
- /// Make a new [`RwLockMappedWriteGuard`] for a component of the locked data.
+ /// Makes a new [`RwLockMappedWriteGuard`] for a component of the locked data.
///
/// This operation cannot fail as the `RwLockWriteGuard` passed in already
/// locked the data.
diff --git a/src/sync/rwlock/write_guard_mapped.rs b/src/sync/rwlock/write_guard_mapped.rs
index 9c5b1e7..3cf69de 100644
--- a/src/sync/rwlock/write_guard_mapped.rs
+++ b/src/sync/rwlock/write_guard_mapped.rs
@@ -21,7 +21,7 @@ pub struct RwLockMappedWriteGuard<'a, T: ?Sized> {
}
impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> {
- /// Make a new `RwLockMappedWriteGuard` for a component of the locked data.
+ /// Makes a new `RwLockMappedWriteGuard` for a component of the locked data.
///
/// This operation cannot fail as the `RwLockMappedWriteGuard` passed in already
/// locked the data.
diff --git a/src/sync/task/atomic_waker.rs b/src/sync/task/atomic_waker.rs
index 8616007..e1330fb 100644
--- a/src/sync/task/atomic_waker.rs
+++ b/src/sync/task/atomic_waker.rs
@@ -4,6 +4,7 @@ use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{self, AtomicUsize};
use std::fmt;
+use std::panic::{resume_unwind, AssertUnwindSafe, RefUnwindSafe, UnwindSafe};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::Waker;
@@ -27,6 +28,9 @@ pub(crate) struct AtomicWaker {
waker: UnsafeCell<Option<Waker>>,
}
+impl RefUnwindSafe for AtomicWaker {}
+impl UnwindSafe for AtomicWaker {}
+
// `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell
// stores a `Waker` value produced by calls to `register` and many threads can
// race to take the waker by calling `wake`.
@@ -84,7 +88,7 @@ pub(crate) struct AtomicWaker {
// back to `WAITING`. This transition must succeed as, at this point, the state
// cannot be transitioned by another thread.
//
-// If the thread is unable to obtain the lock, the `WAKING` bit is still.
+// If the thread is unable to obtain the lock, the `WAKING` bit is still set.
// This is because it has either been set by the current thread but the previous
// value included the `REGISTERING` bit **or** a concurrent thread is in the
// `WAKING` critical section. Either way, no action must be taken.
@@ -123,7 +127,7 @@ pub(crate) struct AtomicWaker {
// Thread A still holds the `wake` lock, the call to `register` will result
// in the task waking itself and get scheduled again.
-/// Idle state
+/// Idle state.
const WAITING: usize = 0;
/// A new waker value is being registered with the `AtomicWaker` cell.
@@ -171,6 +175,10 @@ impl AtomicWaker {
where
W: WakerRef,
{
+ fn catch_unwind<F: FnOnce() -> R, R>(f: F) -> std::thread::Result<R> {
+ std::panic::catch_unwind(AssertUnwindSafe(f))
+ }
+
match self
.state
.compare_exchange(WAITING, REGISTERING, Acquire, Acquire)
@@ -178,8 +186,24 @@ impl AtomicWaker {
{
WAITING => {
unsafe {
- // Locked acquired, update the waker cell
- self.waker.with_mut(|t| *t = Some(waker.into_waker()));
+ // If `into_waker` panics (because it's code outside of
+ // AtomicWaker) we need to prime a guard that is called on
+ // unwind to restore the waker to a WAITING state. Otherwise
+ // any future calls to register will incorrectly be stuck
+ // believing it's being updated by someone else.
+ let new_waker_or_panic = catch_unwind(move || waker.into_waker());
+
+ // Set the field to contain the new waker, or if
+ // `into_waker` panicked, leave the old value.
+ let mut maybe_panic = None;
+ let mut old_waker = None;
+ match new_waker_or_panic {
+ Ok(new_waker) => {
+ old_waker = self.waker.with_mut(|t| (*t).take());
+ self.waker.with_mut(|t| *t = Some(new_waker));
+ }
+ Err(panic) => maybe_panic = Some(panic),
+ }
// Release the lock. If the state transitioned to include
// the `WAKING` bit, this means that a wake has been
@@ -193,33 +217,67 @@ impl AtomicWaker {
.compare_exchange(REGISTERING, WAITING, AcqRel, Acquire);
match res {
- Ok(_) => {}
+ Ok(_) => {
+ // We don't want to give the caller the panic if it
+ // was someone else who put in that waker.
+ let _ = catch_unwind(move || {
+ drop(old_waker);
+ });
+ }
Err(actual) => {
// This branch can only be reached if a
// concurrent thread called `wake`. In this
// case, `actual` **must** be `REGISTERING |
- // `WAKING`.
+ // WAKING`.
debug_assert_eq!(actual, REGISTERING | WAKING);
// Take the waker to wake once the atomic operation has
// completed.
- let waker = self.waker.with_mut(|t| (*t).take()).unwrap();
+ let mut waker = self.waker.with_mut(|t| (*t).take());
// Just swap, because no one could change state
// while state == `Registering | `Waking`
self.state.swap(WAITING, AcqRel);
- // The atomic swap was complete, now
- // wake the waker and return.
- waker.wake();
+ // If `into_waker` panicked, then the waker in the
+ // waker slot is actually the old waker.
+ if maybe_panic.is_some() {
+ old_waker = waker.take();
+ }
+
+ // We don't want to give the caller the panic if it
+ // was someone else who put in that waker.
+ if let Some(old_waker) = old_waker {
+ let _ = catch_unwind(move || {
+ old_waker.wake();
+ });
+ }
+
+ // The atomic swap was complete, now wake the waker
+ // and return.
+ //
+ // If this panics, we end up in a consumed state and
+ // return the panic to the caller.
+ if let Some(waker) = waker {
+ debug_assert!(maybe_panic.is_none());
+ waker.wake();
+ }
}
}
+
+ if let Some(panic) = maybe_panic {
+ // If `into_waker` panicked, return the panic to the caller.
+ resume_unwind(panic);
+ }
}
}
WAKING => {
// Currently in the process of waking the task, i.e.,
// `wake` is currently being called on the old waker.
// So, we call wake on the new waker.
+ //
+ // If this panics, someone else is responsible for restoring the
+ // state of the waker.
waker.wake();
// This is equivalent to a spin lock, so use a spin hint.
@@ -245,6 +303,8 @@ impl AtomicWaker {
/// If `register` has not been called yet, then this does nothing.
pub(crate) fn wake(&self) {
if let Some(waker) = self.take_waker() {
+ // If wake panics, we've consumed the waker which is a legitimate
+ // outcome.
waker.wake();
}
}
diff --git a/src/sync/tests/atomic_waker.rs b/src/sync/tests/atomic_waker.rs
index c832d62..b167a5d 100644
--- a/src/sync/tests/atomic_waker.rs
+++ b/src/sync/tests/atomic_waker.rs
@@ -32,3 +32,42 @@ fn wake_without_register() {
assert!(!waker.is_woken());
}
+
+#[test]
+fn atomic_waker_panic_safe() {
+ use std::panic;
+ use std::ptr;
+ use std::task::{RawWaker, RawWakerVTable, Waker};
+
+ static PANICKING_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ |_| panic!("clone"),
+ |_| unimplemented!("wake"),
+ |_| unimplemented!("wake_by_ref"),
+ |_| (),
+ );
+
+ static NONPANICKING_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ |_| RawWaker::new(ptr::null(), &NONPANICKING_VTABLE),
+ |_| unimplemented!("wake"),
+ |_| unimplemented!("wake_by_ref"),
+ |_| (),
+ );
+
+ let panicking = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &PANICKING_VTABLE)) };
+ let nonpanicking = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NONPANICKING_VTABLE)) };
+
+ let atomic_waker = AtomicWaker::new();
+
+ let panicking = panic::AssertUnwindSafe(&panicking);
+
+ let result = panic::catch_unwind(|| {
+ let panic::AssertUnwindSafe(panicking) = panicking;
+ atomic_waker.register_by_ref(panicking);
+ });
+
+ assert!(result.is_err());
+ assert!(atomic_waker.take_waker().is_none());
+
+ atomic_waker.register_by_ref(&nonpanicking);
+ assert!(atomic_waker.take_waker().is_some());
+}
diff --git a/src/sync/tests/loom_atomic_waker.rs b/src/sync/tests/loom_atomic_waker.rs
index c148bcb..f8bae65 100644
--- a/src/sync/tests/loom_atomic_waker.rs
+++ b/src/sync/tests/loom_atomic_waker.rs
@@ -43,3 +43,58 @@ fn basic_notification() {
}));
});
}
+
+#[test]
+fn test_panicky_waker() {
+ use std::panic;
+ use std::ptr;
+ use std::task::{RawWaker, RawWakerVTable, Waker};
+
+ static PANICKING_VTABLE: RawWakerVTable =
+ RawWakerVTable::new(|_| panic!("clone"), |_| (), |_| (), |_| ());
+
+ let panicking = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &PANICKING_VTABLE)) };
+
+ // If you're working with this test (and I sure hope you never have to!),
+ // uncomment the following section because there will be a lot of panics
+ // which would otherwise log.
+ //
+ // We can't however leaved it uncommented, because it's global.
+ // panic::set_hook(Box::new(|_| ()));
+
+ const NUM_NOTIFY: usize = 2;
+
+ loom::model(move || {
+ let chan = Arc::new(Chan {
+ num: AtomicUsize::new(0),
+ task: AtomicWaker::new(),
+ });
+
+ for _ in 0..NUM_NOTIFY {
+ let chan = chan.clone();
+
+ thread::spawn(move || {
+ chan.num.fetch_add(1, Relaxed);
+ chan.task.wake();
+ });
+ }
+
+ // Note: this panic should have no effect on the overall state of the
+ // waker and it should proceed as normal.
+ //
+ // A thread above might race to flag a wakeup, and a WAKING state will
+ // be preserved if this expected panic races with that so the below
+ // procedure should be allowed to continue uninterrupted.
+ let _ = panic::catch_unwind(|| chan.task.register_by_ref(&panicking));
+
+ block_on(poll_fn(move |cx| {
+ chan.task.register_by_ref(cx.waker());
+
+ if NUM_NOTIFY == chan.num.load(Relaxed) {
+ return Ready(());
+ }
+
+ Pending
+ }));
+ });
+}
diff --git a/src/sync/tests/loom_oneshot.rs b/src/sync/tests/loom_oneshot.rs
index 9729cfb..c5f7972 100644
--- a/src/sync/tests/loom_oneshot.rs
+++ b/src/sync/tests/loom_oneshot.rs
@@ -55,6 +55,35 @@ fn changing_rx_task() {
});
}
+#[test]
+fn try_recv_close() {
+ // reproduces https://github.com/tokio-rs/tokio/issues/4225
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+ thread::spawn(move || {
+ let _ = tx.send(());
+ });
+
+ rx.close();
+ let _ = rx.try_recv();
+ })
+}
+
+#[test]
+fn recv_closed() {
+ // reproduces https://github.com/tokio-rs/tokio/issues/4225
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ let _ = tx.send(1);
+ });
+
+ rx.close();
+ let _ = block_on(rx);
+ });
+}
+
// TODO: Move this into `oneshot` proper.
use std::future::Future;
diff --git a/src/sync/tests/mod.rs b/src/sync/tests/mod.rs
index c5d5601..ee76418 100644
--- a/src/sync/tests/mod.rs
+++ b/src/sync/tests/mod.rs
@@ -1,5 +1,6 @@
cfg_not_loom! {
mod atomic_waker;
+ mod notify;
mod semaphore_batch;
}
diff --git a/src/sync/tests/notify.rs b/src/sync/tests/notify.rs
new file mode 100644
index 0000000..8c9a573
--- /dev/null
+++ b/src/sync/tests/notify.rs
@@ -0,0 +1,44 @@
+use crate::sync::Notify;
+use std::future::Future;
+use std::mem::ManuallyDrop;
+use std::sync::Arc;
+use std::task::{Context, RawWaker, RawWakerVTable, Waker};
+
+#[test]
+fn notify_clones_waker_before_lock() {
+ const VTABLE: &RawWakerVTable = &RawWakerVTable::new(clone_w, wake, wake_by_ref, drop_w);
+
+ unsafe fn clone_w(data: *const ()) -> RawWaker {
+ let arc = ManuallyDrop::new(Arc::<Notify>::from_raw(data as *const Notify));
+ // Or some other arbitrary code that shouldn't be executed while the
+ // Notify wait list is locked.
+ arc.notify_one();
+ let _arc_clone: ManuallyDrop<_> = arc.clone();
+ RawWaker::new(data, VTABLE)
+ }
+
+ unsafe fn drop_w(data: *const ()) {
+ let _ = Arc::<Notify>::from_raw(data as *const Notify);
+ }
+
+ unsafe fn wake(_data: *const ()) {
+ unreachable!()
+ }
+
+ unsafe fn wake_by_ref(_data: *const ()) {
+ unreachable!()
+ }
+
+ let notify = Arc::new(Notify::new());
+ let notify2 = notify.clone();
+
+ let waker =
+ unsafe { Waker::from_raw(RawWaker::new(Arc::into_raw(notify2) as *const _, VTABLE)) };
+ let mut cx = Context::from_waker(&waker);
+
+ let future = notify.notified();
+ pin!(future);
+
+ // The result doesn't matter, we're just testing that we don't deadlock.
+ let _ = future.poll(&mut cx);
+}
diff --git a/src/sync/watch.rs b/src/sync/watch.rs
index b5da218..7e45c11 100644
--- a/src/sync/watch.rs
+++ b/src/sync/watch.rs
@@ -58,6 +58,7 @@ use crate::sync::notify::Notify;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
+use std::mem;
use std::ops;
/// Receives values from the associated [`Sender`](struct@Sender).
@@ -85,7 +86,7 @@ pub struct Sender<T> {
shared: Arc<Shared<T>>,
}
-/// Returns a reference to the inner value
+/// Returns a reference to the inner value.
///
/// Outstanding borrows hold a read lock on the inner value. This means that
/// long lived borrows could cause the produce half to block. It is recommended
@@ -97,27 +98,27 @@ pub struct Ref<'a, T> {
#[derive(Debug)]
struct Shared<T> {
- /// The most recent value
+ /// The most recent value.
value: RwLock<T>,
- /// The current version
+ /// The current version.
///
/// The lowest bit represents a "closed" state. The rest of the bits
/// represent the current version.
state: AtomicState,
- /// Tracks the number of `Receiver` instances
+ /// Tracks the number of `Receiver` instances.
ref_count_rx: AtomicUsize,
/// Notifies waiting receivers that the value changed.
notify_rx: Notify,
- /// Notifies any task listening for `Receiver` dropped events
+ /// Notifies any task listening for `Receiver` dropped events.
notify_tx: Notify,
}
pub mod error {
- //! Watch error types
+ //! Watch error types.
use std::fmt;
@@ -317,7 +318,7 @@ impl<T> Receiver<T> {
Ref { inner }
}
- /// Wait for a change notification, then mark the newest value as seen.
+ /// Waits for a change notification, then marks the newest value as seen.
///
/// If the newest value in the channel has not yet been marked seen when
/// this method is called, the method marks that value seen and returns
@@ -432,10 +433,31 @@ impl<T> Sender<T> {
return Err(error::SendError(value));
}
- {
+ self.send_replace(value);
+ Ok(())
+ }
+
+ /// Sends a new value via the channel, notifying all receivers and returning
+ /// the previous value in the channel.
+ ///
+ /// This can be useful for reusing the buffers inside a watched value.
+ /// Additionally, this method permits sending values even when there are no
+ /// receivers.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ ///
+ /// let (tx, _rx) = watch::channel(1);
+ /// assert_eq!(tx.send_replace(2), 1);
+ /// assert_eq!(tx.send_replace(3), 2);
+ /// ```
+ pub fn send_replace(&self, value: T) -> T {
+ let old = {
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();
- *lock = value;
+ let old = mem::replace(&mut *lock, value);
self.shared.state.increment_version();
@@ -445,12 +467,14 @@ impl<T> Sender<T> {
// that receivers are able to figure out the version number of the
// value they are currently looking at.
drop(lock);
- }
+
+ old
+ };
// Notify all watchers
self.shared.notify_rx.notify_waiters();
- Ok(())
+ old
}
/// Returns a reference to the most recently sent value
@@ -595,7 +619,7 @@ impl<T> Sender<T> {
Receiver::from_shared(version, shared)
}
- /// Returns the number of receivers that currently exist
+ /// Returns the number of receivers that currently exist.
///
/// # Examples
///
diff --git a/src/task/blocking.rs b/src/task/blocking.rs
index 806dbbd..825f25f 100644
--- a/src/task/blocking.rs
+++ b/src/task/blocking.rs
@@ -112,27 +112,82 @@ cfg_rt! {
/// still spawn additional threads for blocking operations. The basic
/// scheduler's single thread is only used for asynchronous code.
///
+ /// # Related APIs and patterns for bridging asynchronous and blocking code
+ ///
+ /// In simple cases, it is sufficient to have the closure accept input
+ /// parameters at creation time and return a single value (or struct/tuple, etc.).
+ ///
+ /// For more complex situations in which it is desirable to stream data to or from
+ /// the synchronous context, the [`mpsc channel`] has `blocking_send` and
+ /// `blocking_recv` methods for use in non-async code such as the thread created
+ /// by `spawn_blocking`.
+ ///
+ /// Another option is [`SyncIoBridge`] for cases where the synchronous context
+ /// is operating on byte streams. For example, you might use an asynchronous
+ /// HTTP client such as [hyper] to fetch data, but perform complex parsing
+ /// of the payload body using a library written for synchronous I/O.
+ ///
+ /// Finally, see also [Bridging with sync code][bridgesync] for discussions
+ /// around the opposite case of using Tokio as part of a larger synchronous
+ /// codebase.
+ ///
/// [`Builder`]: struct@crate::runtime::Builder
/// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code
/// [rayon]: https://docs.rs/rayon
+ /// [`mpsc channel`]: crate::sync::mpsc
+ /// [`SyncIoBridge`]: https://docs.rs/tokio-util/0.6/tokio_util/io/struct.SyncIoBridge.html
+ /// [hyper]: https://docs.rs/hyper
/// [`thread::spawn`]: fn@std::thread::spawn
/// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
+ /// [bridgesync]: https://tokio.rs/tokio/topics/bridging
///
/// # Examples
///
+ /// Pass an input value and receive result of computation:
+ ///
/// ```
/// use tokio::task;
///
/// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
+ /// // Initial input
+ /// let mut v = "Hello, ".to_string();
/// let res = task::spawn_blocking(move || {
- /// // do some compute-heavy work or call synchronous code
- /// "done computing"
+ /// // Stand-in for compute-heavy work or using synchronous APIs
+ /// v.push_str("world");
+ /// // Pass ownership of the value back to the asynchronous context
+ /// v
/// }).await?;
///
- /// assert_eq!(res, "done computing");
+ /// // `res` is the value returned from the thread
+ /// assert_eq!(res.as_str(), "Hello, world");
/// # Ok(())
/// # }
/// ```
+ ///
+ /// Use a channel:
+ ///
+ /// ```
+ /// use tokio::task;
+ /// use tokio::sync::mpsc;
+ ///
+ /// # async fn docs() {
+ /// let (tx, mut rx) = mpsc::channel(2);
+ /// let start = 5;
+ /// let worker = task::spawn_blocking(move || {
+ /// for x in 0..10 {
+ /// // Stand in for complex computation
+ /// tx.blocking_send(start + x).unwrap();
+ /// }
+ /// });
+ ///
+ /// let mut acc = 0;
+ /// while let Some(v) = rx.recv().await {
+ /// acc += v;
+ /// }
+ /// assert_eq!(acc, 95);
+ /// worker.await.unwrap();
+ /// # }
+ /// ```
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
diff --git a/src/task/builder.rs b/src/task/builder.rs
index e46bdef..f991fc6 100644
--- a/src/task/builder.rs
+++ b/src/task/builder.rs
@@ -1,5 +1,4 @@
#![allow(unreachable_pub)]
-use crate::util::error::CONTEXT_MISSING_ERROR;
use crate::{runtime::context, task::JoinHandle};
use std::future::Future;
@@ -98,8 +97,6 @@ impl<'a> Builder<'a> {
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
{
- context::current()
- .expect(CONTEXT_MISSING_ERROR)
- .spawn_blocking_inner(function, self.name)
+ context::current().spawn_blocking_inner(function, self.name)
}
}
diff --git a/src/task/local.rs b/src/task/local.rs
index a28d793..4a5d313 100644
--- a/src/task/local.rs
+++ b/src/task/local.rs
@@ -211,10 +211,10 @@ cfg_rt! {
/// [`task::spawn_local`]: fn@spawn_local
/// [`mpsc`]: mod@crate::sync::mpsc
pub struct LocalSet {
- /// Current scheduler tick
+ /// Current scheduler tick.
tick: Cell<u8>,
- /// State available from thread-local
+ /// State available from thread-local.
context: Context,
/// This type should not be Send.
@@ -222,7 +222,7 @@ cfg_rt! {
}
}
-/// State available from the thread-local
+/// State available from the thread-local.
struct Context {
/// Collection of all active tasks spawned onto this executor.
owned: LocalOwnedTasks<Arc<Shared>>,
@@ -236,10 +236,10 @@ struct Context {
/// LocalSet state shared between threads.
struct Shared {
- /// Remote run queue sender
+ /// Remote run queue sender.
queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
- /// Wake the `LocalSet` task
+ /// Wake the `LocalSet` task.
waker: AtomicWaker,
}
@@ -315,13 +315,13 @@ cfg_rt! {
}
}
-/// Initial queue capacity
+/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;
/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;
-/// How often it check the remote queue first
+/// How often it check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;
impl LocalSet {
@@ -466,7 +466,7 @@ impl LocalSet {
rt.block_on(self.run_until(future))
}
- /// Run a future to completion on the local set, returning its output.
+ /// Runs a future to completion on the local set, returning its output.
///
/// This returns a future that runs the given future with a local set,
/// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
@@ -505,7 +505,7 @@ impl LocalSet {
run_until.await
}
- /// Tick the scheduler, returning whether the local future needs to be
+ /// Ticks the scheduler, returning whether the local future needs to be
/// notified again.
fn tick(&self) -> bool {
for _ in 0..MAX_TASKS_PER_TICK {
diff --git a/src/time/clock.rs b/src/time/clock.rs
index fae5c76..41be9ba 100644
--- a/src/time/clock.rs
+++ b/src/time/clock.rs
@@ -57,11 +57,11 @@ cfg_test_util! {
/// Instant to use as the clock's base instant.
base: std::time::Instant,
- /// Instant at which the clock was last unfrozen
+ /// Instant at which the clock was last unfrozen.
unfrozen: Option<std::time::Instant>,
}
- /// Pause time
+ /// Pauses time.
///
/// The current value of `Instant::now()` is saved and all subsequent calls
/// to `Instant::now()` will return the saved value. The saved value can be
@@ -101,7 +101,7 @@ cfg_test_util! {
clock.pause();
}
- /// Resume time
+ /// Resumes time.
///
/// Clears the saved `Instant::now()` value. Subsequent calls to
/// `Instant::now()` will return the value returned by the system call.
@@ -121,7 +121,7 @@ cfg_test_util! {
inner.unfrozen = Some(std::time::Instant::now());
}
- /// Advance time.
+ /// Advances time.
///
/// Increments the saved `Instant::now()` value by `duration`. Subsequent
/// calls to `Instant::now()` will return the result of the increment.
@@ -159,7 +159,7 @@ cfg_test_util! {
crate::task::yield_now().await;
}
- /// Return the current instant, factoring in frozen time.
+ /// Returns the current instant, factoring in frozen time.
pub(crate) fn now() -> Instant {
if let Some(clock) = clock() {
clock.now()
@@ -169,7 +169,7 @@ cfg_test_util! {
}
impl Clock {
- /// Return a new `Clock` instance that uses the current execution context's
+ /// Returns a new `Clock` instance that uses the current execution context's
/// source of time.
pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock {
let now = std::time::Instant::now();
diff --git a/src/time/driver/entry.rs b/src/time/driver/entry.rs
index 168e0b9..9e9f0dc 100644
--- a/src/time/driver/entry.rs
+++ b/src/time/driver/entry.rs
@@ -345,7 +345,7 @@ impl TimerShared {
}
}
- /// Gets the cached time-of-expiration value
+ /// Gets the cached time-of-expiration value.
pub(super) fn cached_when(&self) -> u64 {
// Cached-when is only accessed under the driver lock, so we can use relaxed
self.driver_state.0.cached_when.load(Ordering::Relaxed)
diff --git a/src/time/driver/handle.rs b/src/time/driver/handle.rs
index 77b4358..7aaf65a 100644
--- a/src/time/driver/handle.rs
+++ b/src/time/driver/handle.rs
@@ -16,17 +16,17 @@ impl Handle {
Handle { time_source, inner }
}
- /// Returns the time source associated with this handle
+ /// Returns the time source associated with this handle.
pub(super) fn time_source(&self) -> &ClockTime {
&self.time_source
}
- /// Access the driver's inner structure
+ /// Access the driver's inner structure.
pub(super) fn get(&self) -> &super::Inner {
&*self.inner
}
- // Check whether the driver has been shutdown
+ /// Checks whether the driver has been shutdown.
pub(super) fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
}
diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs
index f611fbb..cf2290b 100644
--- a/src/time/driver/mod.rs
+++ b/src/time/driver/mod.rs
@@ -4,7 +4,7 @@
#![allow(unused_unsafe)]
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
-//! Time driver
+//! Time driver.
mod entry;
pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared};
@@ -83,13 +83,13 @@ use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
/// [interval]: crate::time::Interval
#[derive(Debug)]
pub(crate) struct Driver<P: Park + 'static> {
- /// Timing backend in use
+ /// Timing backend in use.
time_source: ClockTime,
- /// Shared state
+ /// Shared state.
handle: Handle,
- /// Parker to delegate to
+ /// Parker to delegate to.
park: P,
// When `true`, a call to `park_timeout` should immediately return and time
@@ -146,25 +146,25 @@ struct Inner {
// The state is split like this so `Handle` can access `is_shutdown` without locking the mutex
pub(super) state: Mutex<InnerState>,
- /// True if the driver is being shutdown
+ /// True if the driver is being shutdown.
pub(super) is_shutdown: AtomicBool,
}
/// Time state shared which must be protected by a `Mutex`
struct InnerState {
- /// Timing backend in use
+ /// Timing backend in use.
time_source: ClockTime,
/// The last published timer `elapsed` value.
elapsed: u64,
- /// The earliest time at which we promise to wake up without unparking
+ /// The earliest time at which we promise to wake up without unparking.
next_wake: Option<NonZeroU64>,
- /// Timer wheel
+ /// Timer wheel.
wheel: wheel::Wheel,
- /// Unparker that can be used to wake the time driver
+ /// Unparker that can be used to wake the time driver.
unpark: Box<dyn Unpark>,
}
diff --git a/src/time/driver/sleep.rs b/src/time/driver/sleep.rs
index 4e9ed65..43ff694 100644
--- a/src/time/driver/sleep.rs
+++ b/src/time/driver/sleep.rs
@@ -1,11 +1,17 @@
use crate::time::driver::{Handle, TimerEntry};
use crate::time::{error::Error, Duration, Instant};
+use crate::util::trace;
use pin_project_lite::pin_project;
use std::future::Future;
+use std::panic::Location;
use std::pin::Pin;
use std::task::{self, Poll};
+cfg_trace! {
+ use crate::time::driver::ClockTime;
+}
+
/// Waits until `deadline` is reached.
///
/// No work is performed while awaiting on the sleep future to complete. `Sleep`
@@ -39,8 +45,9 @@ use std::task::{self, Poll};
/// [`interval`]: crate::time::interval()
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "delay_until"))]
+#[cfg_attr(tokio_track_caller, track_caller)]
pub fn sleep_until(deadline: Instant) -> Sleep {
- Sleep::new_timeout(deadline)
+ return Sleep::new_timeout(deadline, trace::caller_location());
}
/// Waits until `duration` has elapsed.
@@ -82,10 +89,13 @@ pub fn sleep_until(deadline: Instant) -> Sleep {
// Alias for old name in 0.x
#[cfg_attr(docsrs, doc(alias = "delay_for"))]
#[cfg_attr(docsrs, doc(alias = "wait"))]
+#[cfg_attr(tokio_track_caller, track_caller)]
pub fn sleep(duration: Duration) -> Sleep {
+ let location = trace::caller_location();
+
match Instant::now().checked_add(duration) {
- Some(deadline) => sleep_until(deadline),
- None => sleep_until(Instant::far_future()),
+ Some(deadline) => Sleep::new_timeout(deadline, location),
+ None => Sleep::new_timeout(Instant::far_future(), location),
}
}
@@ -182,7 +192,7 @@ pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Sleep {
- deadline: Instant,
+ inner: Inner,
// The link between the `Sleep` instance and the timer that drives it.
#[pin]
@@ -190,21 +200,87 @@ pin_project! {
}
}
+cfg_trace! {
+ #[derive(Debug)]
+ struct Inner {
+ deadline: Instant,
+ resource_span: tracing::Span,
+ async_op_span: tracing::Span,
+ time_source: ClockTime,
+ }
+}
+
+cfg_not_trace! {
+ #[derive(Debug)]
+ struct Inner {
+ deadline: Instant,
+ }
+}
+
impl Sleep {
- pub(crate) fn new_timeout(deadline: Instant) -> Sleep {
+ #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
+ pub(crate) fn new_timeout(
+ deadline: Instant,
+ location: Option<&'static Location<'static>>,
+ ) -> Sleep {
let handle = Handle::current();
let entry = TimerEntry::new(&handle, deadline);
- Sleep { deadline, entry }
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let inner = {
+ let time_source = handle.time_source().clone();
+ let deadline_tick = time_source.deadline_to_tick(deadline);
+ let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0);
+
+ #[cfg(tokio_track_caller)]
+ let location = location.expect("should have location if tracking caller");
+
+ #[cfg(tokio_track_caller)]
+ let resource_span = tracing::trace_span!(
+ "runtime.resource",
+ concrete_type = "Sleep",
+ kind = "timer",
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
+ );
+
+ #[cfg(not(tokio_track_caller))]
+ let resource_span =
+ tracing::trace_span!("runtime.resource", concrete_type = "Sleep", kind = "timer");
+
+ let async_op_span =
+ tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout");
+
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ parent: resource_span.id(),
+ duration = duration,
+ duration.unit = "ms",
+ duration.op = "override",
+ );
+
+ Inner {
+ deadline,
+ resource_span,
+ async_op_span,
+ time_source,
+ }
+ };
+
+ #[cfg(not(all(tokio_unstable, feature = "tracing")))]
+ let inner = Inner { deadline };
+
+ Sleep { inner, entry }
}
- pub(crate) fn far_future() -> Sleep {
- Self::new_timeout(Instant::far_future())
+ pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {
+ Self::new_timeout(Instant::far_future(), location)
}
/// Returns the instant at which the future will complete.
pub fn deadline(&self) -> Instant {
- self.deadline
+ self.inner.deadline
}
/// Returns `true` if `Sleep` has elapsed.
@@ -244,37 +320,83 @@ impl Sleep {
///
/// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
+ self.reset_inner(deadline)
+ }
+
+ fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();
me.entry.reset(deadline);
- *me.deadline = deadline;
+ (*me.inner).deadline = deadline;
+
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ {
+ me.inner.async_op_span =
+ tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset");
+
+ tracing::trace!(
+ target: "runtime::resource::state_update",
+ parent: me.inner.resource_span.id(),
+ duration = {
+ let now = me.inner.time_source.now();
+ let deadline_tick = me.inner.time_source.deadline_to_tick(deadline);
+ deadline_tick.checked_sub(now).unwrap_or(0)
+ },
+ duration.unit = "ms",
+ duration.op = "override",
+ );
+ }
}
- fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
- let me = self.project();
+ cfg_not_trace! {
+ fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
+ let me = self.project();
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
- me.entry.poll_elapsed(cx).map(move |r| {
- coop.made_progress();
- r
- })
+ me.entry.poll_elapsed(cx).map(move |r| {
+ coop.made_progress();
+ r
+ })
+ }
+ }
+
+ cfg_trace! {
+ fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
+ let me = self.project();
+ // Keep track of task budget
+ let coop = ready!(trace_poll_op!(
+ "poll_elapsed",
+ crate::coop::poll_proceed(cx),
+ me.inner.resource_span.id(),
+ ));
+
+ let result = me.entry.poll_elapsed(cx).map(move |r| {
+ coop.made_progress();
+ r
+ });
+
+ trace_poll_op!("poll_elapsed", result, me.inner.resource_span.id())
+ }
}
}
impl Future for Sleep {
type Output = ();
+ // `poll_elapsed` can return an error in two cases:
+ //
+ // - AtCapacity: this is a pathological case where far too many
+ // sleep instances have been scheduled.
+ // - Shutdown: No timer has been setup, which is a mis-use error.
+ //
+ // Both cases are extremely rare, and pretty accurately fit into
+ // "logic errors", so we just panic in this case. A user couldn't
+ // really do much better if we passed the error onwards.
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
- // `poll_elapsed` can return an error in two cases:
- //
- // - AtCapacity: this is a pathological case where far too many
- // sleep instances have been scheduled.
- // - Shutdown: No timer has been setup, which is a mis-use error.
- //
- // Both cases are extremely rare, and pretty accurately fit into
- // "logic errors", so we just panic in this case. A user couldn't
- // really do much better if we passed the error onwards.
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ let _span = self.inner.async_op_span.clone().entered();
+
match ready!(self.as_mut().poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
diff --git a/src/time/driver/wheel/level.rs b/src/time/driver/wheel/level.rs
index 81d6b58..34d3176 100644
--- a/src/time/driver/wheel/level.rs
+++ b/src/time/driver/wheel/level.rs
@@ -250,7 +250,7 @@ fn level_range(level: usize) -> u64 {
LEVEL_MULT as u64 * slot_range(level)
}
-/// Convert a duration (milliseconds) and a level to a slot position
+/// Converts a duration (milliseconds) and a level to a slot position.
fn slot_for(duration: u64, level: usize) -> usize {
((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
}
diff --git a/src/time/driver/wheel/mod.rs b/src/time/driver/wheel/mod.rs
index 5a40f6d..f088f2c 100644
--- a/src/time/driver/wheel/mod.rs
+++ b/src/time/driver/wheel/mod.rs
@@ -46,11 +46,11 @@ pub(crate) struct Wheel {
/// precision of 1 millisecond.
const NUM_LEVELS: usize = 6;
-/// The maximum duration of a `Sleep`
+/// The maximum duration of a `Sleep`.
pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
impl Wheel {
- /// Create a new timing wheel
+ /// Creates a new timing wheel.
pub(crate) fn new() -> Wheel {
let levels = (0..NUM_LEVELS).map(Level::new).collect();
@@ -61,13 +61,13 @@ impl Wheel {
}
}
- /// Return the number of milliseconds that have elapsed since the timing
+ /// Returns the number of milliseconds that have elapsed since the timing
/// wheel's creation.
pub(crate) fn elapsed(&self) -> u64 {
self.elapsed
}
- /// Insert an entry into the timing wheel.
+ /// Inserts an entry into the timing wheel.
///
/// # Arguments
///
@@ -115,7 +115,7 @@ impl Wheel {
Ok(when)
}
- /// Remove `item` from the timing wheel.
+ /// Removes `item` from the timing wheel.
pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) {
unsafe {
let when = item.as_ref().cached_when();
@@ -136,7 +136,7 @@ impl Wheel {
}
}
- /// Instant at which to poll
+ /// Instant at which to poll.
pub(crate) fn poll_at(&self) -> Option<u64> {
self.next_expiration().map(|expiration| expiration.deadline)
}
diff --git a/src/time/driver/wheel/stack.rs b/src/time/driver/wheel/stack.rs
index e7ed137..80651c3 100644
--- a/src/time/driver/wheel/stack.rs
+++ b/src/time/driver/wheel/stack.rs
@@ -3,7 +3,7 @@ use crate::time::driver::Entry;
use std::ptr;
-/// A doubly linked stack
+/// A doubly linked stack.
#[derive(Debug)]
pub(crate) struct Stack {
head: Option<OwnedItem>,
@@ -50,7 +50,7 @@ impl Stack {
self.head = Some(entry);
}
- /// Pops an item from the stack
+ /// Pops an item from the stack.
pub(crate) fn pop(&mut self) -> Option<OwnedItem> {
let entry = self.head.take();
diff --git a/src/time/error.rs b/src/time/error.rs
index 8674feb..63f0a3b 100644
--- a/src/time/error.rs
+++ b/src/time/error.rs
@@ -40,7 +40,7 @@ impl From<Kind> for Error {
}
}
-/// Error returned by `Timeout`.
+/// Errors returned by `Timeout`.
#[derive(Debug, PartialEq)]
pub struct Elapsed(());
@@ -72,7 +72,7 @@ impl Error {
matches!(self.0, Kind::AtCapacity)
}
- /// Create an error representing a misconfigured timer.
+ /// Creates an error representing a misconfigured timer.
pub fn invalid() -> Error {
Error(Invalid)
}
diff --git a/src/time/interval.rs b/src/time/interval.rs
index a63e47b..7e07e51 100644
--- a/src/time/interval.rs
+++ b/src/time/interval.rs
@@ -147,7 +147,7 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval {
/// milliseconds.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MissedTickBehavior {
- /// Tick as fast as possible until caught up.
+ /// Ticks as fast as possible until caught up.
///
/// When this strategy is used, [`Interval`] schedules ticks "normally" (the
/// same as it would have if the ticks hadn't been delayed), which results
@@ -252,7 +252,7 @@ pub enum MissedTickBehavior {
/// [`tick`]: Interval::tick
Delay,
- /// Skip missed ticks and tick on the next multiple of `period` from
+ /// Skips missed ticks and tick on the next multiple of `period` from
/// `start`.
///
/// When this strategy is used, [`Interval`] schedules the next tick to fire
@@ -342,7 +342,7 @@ impl Default for MissedTickBehavior {
}
}
-/// Interval returned by [`interval`] and [`interval_at`]
+/// Interval returned by [`interval`] and [`interval_at`].
///
/// This type allows you to wait on a sequence of instants with a certain
/// duration between each instant. Unlike calling [`sleep`] in a loop, this lets
@@ -367,6 +367,11 @@ pub struct Interval {
impl Interval {
/// Completes when the next instant in the interval has been reached.
///
+ /// # Cancel safety
+ ///
+ /// This method is cancellation safe. If `tick` is used as the branch in a `tokio::select!` and
+ /// another branch completes first, then no tick has been consumed.
+ ///
/// # Examples
///
/// ```
@@ -389,7 +394,7 @@ impl Interval {
poll_fn(|cx| self.poll_tick(cx)).await
}
- /// Poll for the next instant in the interval to be reached.
+ /// Polls for the next instant in the interval to be reached.
///
/// This method can return the following values:
///
diff --git a/src/time/timeout.rs b/src/time/timeout.rs
index 61964ad..6725caa 100644
--- a/src/time/timeout.rs
+++ b/src/time/timeout.rs
@@ -4,14 +4,17 @@
//!
//! [`Timeout`]: struct@Timeout
-use crate::time::{error::Elapsed, sleep_until, Duration, Instant, Sleep};
+use crate::{
+ time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
+ util::trace,
+};
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{self, Poll};
-/// Require a `Future` to complete before the specified duration has elapsed.
+/// Requires a `Future` to complete before the specified duration has elapsed.
///
/// If the future completes before the duration has elapsed, then the completed
/// value is returned. Otherwise, an error is returned and the future is
@@ -45,19 +48,22 @@ use std::task::{self, Poll};
/// }
/// # }
/// ```
+#[cfg_attr(tokio_track_caller, track_caller)]
pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T>
where
T: Future,
{
+ let location = trace::caller_location();
+
let deadline = Instant::now().checked_add(duration);
let delay = match deadline {
- Some(deadline) => Sleep::new_timeout(deadline),
- None => Sleep::far_future(),
+ Some(deadline) => Sleep::new_timeout(deadline, location),
+ None => Sleep::far_future(location),
};
Timeout::new_with_delay(future, delay)
}
-/// Require a `Future` to complete before the specified instant in time.
+/// Requires a `Future` to complete before the specified instant in time.
///
/// If the future completes before the instant is reached, then the completed
/// value is returned. Otherwise, an error is returned.
diff --git a/src/util/bit.rs b/src/util/bit.rs
index 392a0e8..a43c2c2 100644
--- a/src/util/bit.rs
+++ b/src/util/bit.rs
@@ -27,7 +27,7 @@ impl Pack {
pointer_width() - (self.mask >> self.shift).leading_zeros()
}
- /// Max representable value
+ /// Max representable value.
pub(crate) const fn max_value(&self) -> usize {
(1 << self.width()) - 1
}
@@ -60,7 +60,7 @@ impl fmt::Debug for Pack {
}
}
-/// Returns the width of a pointer in bits
+/// Returns the width of a pointer in bits.
pub(crate) const fn pointer_width() -> u32 {
std::mem::size_of::<usize>() as u32 * 8
}
@@ -71,7 +71,7 @@ pub(crate) const fn mask_for(n: u32) -> usize {
shift | (shift - 1)
}
-/// Unpack a value using a mask & shift
+/// Unpacks a value using a mask & shift.
pub(crate) const fn unpack(src: usize, mask: usize, shift: u32) -> usize {
(src & mask) >> shift
}
diff --git a/src/util/error.rs b/src/util/error.rs
index 0e52364..8f252c0 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -7,3 +7,11 @@ pub(crate) const CONTEXT_MISSING_ERROR: &str =
/// Error string explaining that the Tokio context is shutting down and cannot drive timers.
pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str =
"A Tokio 1.x context was found, but it is being shutdown.";
+
+// some combinations of features might not use this
+#[allow(dead_code)]
+/// Error string explaining that the Tokio context is not available because the
+/// thread-local storing it has been destroyed. This usually only happens during
+/// destructors of other thread-locals.
+pub(crate) const THREAD_LOCAL_DESTROYED_ERROR: &str =
+ "The Tokio context thread-local variable has been destroyed.";
diff --git a/src/util/linked_list.rs b/src/util/linked_list.rs
index 1eab81c..894d216 100644
--- a/src/util/linked_list.rs
+++ b/src/util/linked_list.rs
@@ -1,6 +1,6 @@
#![cfg_attr(not(feature = "full"), allow(dead_code))]
-//! An intrusive double linked list of data
+//! An intrusive double linked list of data.
//!
//! The data structure supports tracking pinned nodes. Most of the data
//! structure's APIs are `unsafe` as they require the caller to ensure the
@@ -46,10 +46,10 @@ pub(crate) unsafe trait Link {
/// This is usually a pointer-ish type.
type Handle;
- /// Node type
+ /// Node type.
type Target;
- /// Convert the handle to a raw pointer without consuming the handle
+ /// Convert the handle to a raw pointer without consuming the handle.
#[allow(clippy::wrong_self_convention)]
fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target>;
@@ -60,7 +60,7 @@ pub(crate) unsafe trait Link {
unsafe fn pointers(target: NonNull<Self::Target>) -> NonNull<Pointers<Self::Target>>;
}
-/// Previous / next pointers
+/// Previous / next pointers.
pub(crate) struct Pointers<T> {
inner: UnsafeCell<PointersInner<T>>,
}
diff --git a/src/util/rand.rs b/src/util/rand.rs
index 17b3ec1..6b19c8b 100644
--- a/src/util/rand.rs
+++ b/src/util/rand.rs
@@ -1,6 +1,6 @@
use std::cell::Cell;
-/// Fast random number generate
+/// Fast random number generate.
///
/// Implement xorshift64+: 2 32-bit xorshift sequences added together.
/// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
@@ -14,7 +14,7 @@ pub(crate) struct FastRand {
}
impl FastRand {
- /// Initialize a new, thread-local, fast random number generator.
+ /// Initializes a new, thread-local, fast random number generator.
pub(crate) fn new(seed: u64) -> FastRand {
let one = (seed >> 32) as u32;
let mut two = seed as u32;
diff --git a/src/util/slab.rs b/src/util/slab.rs
index 2ddaa6c..97355d5 100644
--- a/src/util/slab.rs
+++ b/src/util/slab.rs
@@ -85,11 +85,11 @@ pub(crate) struct Address(usize);
/// An entry in the slab.
pub(crate) trait Entry: Default {
- /// Reset the entry's value and track the generation.
+ /// Resets the entry's value and track the generation.
fn reset(&self);
}
-/// A reference to a value stored in the slab
+/// A reference to a value stored in the slab.
pub(crate) struct Ref<T> {
value: *const Value<T>,
}
@@ -101,9 +101,9 @@ const NUM_PAGES: usize = 19;
const PAGE_INITIAL_SIZE: usize = 32;
const PAGE_INDEX_SHIFT: u32 = PAGE_INITIAL_SIZE.trailing_zeros() + 1;
-/// A page in the slab
+/// A page in the slab.
struct Page<T> {
- /// Slots
+ /// Slots.
slots: Mutex<Slots<T>>,
// Number of slots currently being used. This is not guaranteed to be up to
@@ -116,7 +116,7 @@ struct Page<T> {
// The number of slots the page can hold.
len: usize,
- // Length of all previous pages combined
+ // Length of all previous pages combined.
prev_len: usize,
}
@@ -128,9 +128,9 @@ struct CachedPage<T> {
init: usize,
}
-/// Page state
+/// Page state.
struct Slots<T> {
- /// Slots
+ /// Slots.
slots: Vec<Slot<T>>,
head: usize,
@@ -159,9 +159,9 @@ struct Slot<T> {
next: u32,
}
-/// Value paired with a reference to the page
+/// Value paired with a reference to the page.
struct Value<T> {
- /// Value stored in the value
+ /// Value stored in the value.
value: T,
/// Pointer to the page containing the slot.
@@ -171,7 +171,7 @@ struct Value<T> {
}
impl<T> Slab<T> {
- /// Create a new, empty, slab
+ /// Create a new, empty, slab.
pub(crate) fn new() -> Slab<T> {
// Initializing arrays is a bit annoying. Instead of manually writing
// out an array and every single entry, `Default::default()` is used to
@@ -455,7 +455,7 @@ impl<T> Page<T> {
addr.0 - self.prev_len
}
- /// Returns the address for the given slot
+ /// Returns the address for the given slot.
fn addr(&self, slot: usize) -> Address {
Address(slot + self.prev_len)
}
@@ -478,7 +478,7 @@ impl<T> Default for Page<T> {
}
impl<T> Page<T> {
- /// Release a slot into the page's free list
+ /// Release a slot into the page's free list.
fn release(&self, value: *const Value<T>) {
let mut locked = self.slots.lock();
@@ -492,7 +492,7 @@ impl<T> Page<T> {
}
impl<T> CachedPage<T> {
- /// Refresh the cache
+ /// Refreshes the cache.
fn refresh(&mut self, page: &Page<T>) {
let slots = page.slots.lock();
@@ -502,7 +502,7 @@ impl<T> CachedPage<T> {
}
}
- // Get a value by index
+ /// Gets a value by index.
fn get(&self, idx: usize) -> &T {
assert!(idx < self.init);
@@ -576,7 +576,7 @@ impl<T: Entry> Slot<T> {
}
impl<T> Value<T> {
- // Release the slot, returning the `Arc<Page<T>>` logically owned by the ref.
+ /// Releases the slot, returning the `Arc<Page<T>>` logically owned by the ref.
fn release(&self) -> Arc<Page<T>> {
// Safety: called by `Ref`, which owns an `Arc<Page<T>>` instance.
let page = unsafe { Arc::from_raw(self.page) };
diff --git a/src/util/trace.rs b/src/util/trace.rs
index 61c155c..e3c26f9 100644
--- a/src/util/trace.rs
+++ b/src/util/trace.rs
@@ -14,7 +14,9 @@ cfg_trace! {
"runtime.spawn",
%kind,
task.name = %name.unwrap_or_default(),
- spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
+ loc.file = location.file(),
+ loc.line = location.line(),
+ loc.col = location.column(),
);
#[cfg(not(tokio_track_caller))]
let span = tracing::trace_span!(
@@ -27,6 +29,15 @@ cfg_trace! {
}
}
}
+cfg_time! {
+ #[cfg_attr(tokio_track_caller, track_caller)]
+ pub(crate) fn caller_location() -> Option<&'static std::panic::Location<'static>> {
+ #[cfg(all(tokio_track_caller, tokio_unstable, feature = "tracing"))]
+ return Some(std::panic::Location::caller());
+ #[cfg(not(all(tokio_track_caller, tokio_unstable, feature = "tracing")))]
+ None
+ }
+}
cfg_not_trace! {
cfg_rt! {
diff --git a/src/util/vec_deque_cell.rs b/src/util/vec_deque_cell.rs
index 12883ab..b4e124c 100644
--- a/src/util/vec_deque_cell.rs
+++ b/src/util/vec_deque_cell.rs
@@ -45,7 +45,7 @@ impl<T> VecDequeCell<T> {
}
}
- /// Replace the inner VecDeque with an empty VecDeque and return the current
+ /// Replaces the inner VecDeque with an empty VecDeque and return the current
/// contents.
pub(crate) fn take(&self) -> VecDeque<T> {
unsafe { self.with_inner(|inner| std::mem::take(inner)) }
diff --git a/src/util/wake.rs b/src/util/wake.rs
index 5773937..8f89668 100644
--- a/src/util/wake.rs
+++ b/src/util/wake.rs
@@ -4,12 +4,12 @@ use std::ops::Deref;
use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable, Waker};
-/// Simplified waking interface based on Arcs
+/// Simplified waking interface based on Arcs.
pub(crate) trait Wake: Send + Sync {
- /// Wake by value
+ /// Wake by value.
fn wake(self: Arc<Self>);
- /// Wake by reference
+ /// Wake by reference.
fn wake_by_ref(arc_self: &Arc<Self>);
}
diff --git a/tests/macros_select.rs b/tests/macros_select.rs
index 4da88fb..b4f8544 100644
--- a/tests/macros_select.rs
+++ b/tests/macros_select.rs
@@ -558,3 +558,29 @@ pub async fn default_numeric_fallback() {
else => (),
}
}
+
+// https://github.com/tokio-rs/tokio/issues/4182
+#[tokio::test]
+async fn mut_ref_patterns() {
+ tokio::select! {
+ Some(mut foo) = async { Some("1".to_string()) } => {
+ assert_eq!(foo, "1");
+ foo = "2".to_string();
+ assert_eq!(foo, "2");
+ },
+ };
+
+ tokio::select! {
+ Some(ref foo) = async { Some("1".to_string()) } => {
+ assert_eq!(*foo, "1");
+ },
+ };
+
+ tokio::select! {
+ Some(ref mut foo) = async { Some("1".to_string()) } => {
+ assert_eq!(*foo, "1");
+ *foo = "2".to_string();
+ assert_eq!(*foo, "2");
+ },
+ };
+}
diff --git a/tests/macros_test.rs b/tests/macros_test.rs
index 7212c7b..bca2c91 100644
--- a/tests/macros_test.rs
+++ b/tests/macros_test.rs
@@ -30,3 +30,19 @@ fn trait_method() {
}
().f()
}
+
+// https://github.com/tokio-rs/tokio/issues/4175
+#[tokio::main]
+pub async fn issue_4175_main_1() -> ! {
+ panic!();
+}
+#[tokio::main]
+pub async fn issue_4175_main_2() -> std::io::Result<()> {
+ panic!();
+}
+#[allow(unreachable_code)]
+#[tokio::test]
+pub async fn issue_4175_test() -> std::io::Result<()> {
+ return Ok(());
+ panic!();
+}
diff --git a/tests/rt_basic.rs b/tests/rt_basic.rs
index 4b1bdad..70056b1 100644
--- a/tests/rt_basic.rs
+++ b/tests/rt_basic.rs
@@ -3,10 +3,14 @@
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
+use tokio::time::{timeout, Duration};
use tokio_test::{assert_err, assert_ok};
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::task::{Context, Poll};
use std::thread;
-use tokio::time::{timeout, Duration};
mod support {
pub(crate) mod mpsc_stream;
@@ -136,6 +140,35 @@ fn acquire_mutex_in_drop() {
}
#[test]
+fn drop_tasks_in_context() {
+ static SUCCESS: AtomicBool = AtomicBool::new(false);
+
+ struct ContextOnDrop;
+
+ impl Future for ContextOnDrop {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ Poll::Pending
+ }
+ }
+
+ impl Drop for ContextOnDrop {
+ fn drop(&mut self) {
+ if tokio::runtime::Handle::try_current().is_ok() {
+ SUCCESS.store(true, Ordering::SeqCst);
+ }
+ }
+ }
+
+ let rt = rt();
+ rt.spawn(ContextOnDrop);
+ drop(rt);
+
+ assert!(SUCCESS.load(Ordering::SeqCst));
+}
+
+#[test]
#[should_panic(
expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
)]
diff --git a/tests/rt_threaded.rs b/tests/rt_threaded.rs
index 9e76c4e..5f047a7 100644
--- a/tests/rt_threaded.rs
+++ b/tests/rt_threaded.rs
@@ -54,6 +54,7 @@ fn many_oneshot_futures() {
drop(rt);
}
}
+
#[test]
fn many_multishot_futures() {
const CHAIN: usize = 200;
@@ -473,6 +474,30 @@ fn wake_during_shutdown() {
rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await });
}
+#[should_panic]
+#[tokio::test]
+async fn test_block_in_place1() {
+ tokio::task::block_in_place(|| {});
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn test_block_in_place2() {
+ tokio::task::block_in_place(|| {});
+}
+
+#[should_panic]
+#[tokio::main(flavor = "current_thread")]
+#[test]
+async fn test_block_in_place3() {
+ tokio::task::block_in_place(|| {});
+}
+
+#[tokio::main]
+#[test]
+async fn test_block_in_place4() {
+ tokio::task::block_in_place(|| {});
+}
+
fn rt() -> Runtime {
Runtime::new().unwrap()
}
diff --git a/tests/udp.rs b/tests/udp.rs
index 715d8eb..ec2a1e9 100644
--- a/tests/udp.rs
+++ b/tests/udp.rs
@@ -5,6 +5,7 @@ use futures::future::poll_fn;
use std::io;
use std::sync::Arc;
use tokio::{io::ReadBuf, net::UdpSocket};
+use tokio_test::assert_ok;
const MSG: &[u8] = b"hello";
const MSG_LEN: usize = MSG.len();
@@ -440,3 +441,46 @@ async fn try_recv_buf_from() {
}
}
}
+
+#[tokio::test]
+async fn poll_ready() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let saddr = server.local_addr().unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let caddr = client.local_addr().unwrap();
+
+ for _ in 0..5 {
+ loop {
+ assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await);
+
+ match client.try_send_to(b"hello world", saddr) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await);
+
+ let mut buf = Vec::with_capacity(512);
+
+ match server.try_recv_buf_from(&mut buf) {
+ Ok((n, addr)) => {
+ assert_eq!(n, 11);
+ assert_eq!(addr, caddr);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}
diff --git a/tests/uds_datagram.rs b/tests/uds_datagram.rs
index 4d28468..5e5486b 100644
--- a/tests/uds_datagram.rs
+++ b/tests/uds_datagram.rs
@@ -328,3 +328,50 @@ async fn try_recv_buf_never_block() -> io::Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn poll_ready() -> io::Result<()> {
+ let dir = tempfile::tempdir().unwrap();
+ let server_path = dir.path().join("server.sock");
+ let client_path = dir.path().join("client.sock");
+
+ // Create listener
+ let server = UnixDatagram::bind(&server_path)?;
+
+ // Create socket pair
+ let client = UnixDatagram::bind(&client_path)?;
+
+ for _ in 0..5 {
+ loop {
+ poll_fn(|cx| client.poll_send_ready(cx)).await?;
+
+ match client.try_send_to(b"hello world", &server_path) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ poll_fn(|cx| server.poll_recv_ready(cx)).await?;
+
+ let mut buf = Vec::with_capacity(512);
+
+ match server.try_recv_buf_from(&mut buf) {
+ Ok((n, addr)) => {
+ assert_eq!(n, 11);
+ assert_eq!(addr.as_pathname(), Some(client_path.as_ref()));
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+
+ Ok(())
+}