diff options
author | Jeff Vander Stoep <jeffv@google.com> | 2021-11-16 18:39:59 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-11-16 18:39:59 +0000 |
commit | 5b1b46ec52426b0a27bf3a5a2908d3472a52e5d0 (patch) | |
tree | a86a1094890ceb522b599e1f37f24c2e16239fd3 | |
parent | b4b1fce325271157868568d1634fc4425e84c9ae (diff) | |
parent | 786869e7d42e26203f1c4ed10cfa383eef81ae7b (diff) | |
download | tokio-5b1b46ec52426b0a27bf3a5a2908d3472a52e5d0.tar.gz |
Update to 1.14.0 am: 1db412d2c3 am: 786869e7d4
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio/+/1892699
Change-Id: Ic71ca40bfd185d21110f7ff7ee5de5a62e698a68
136 files changed, 2927 insertions, 585 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 0577e49..92e2ebe 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "1ed89aa5cf7e5b9524b9e08a02030d222fd63417" + "sha1": "623c09c52c2c38a8d75e94c166593547e8477707" } } diff --git a/CHANGELOG.md b/CHANGELOG.md index 16e44e5..afa8bf0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,102 @@ +# 1.14.0 (November 15, 2021) + +### Fixed + +- macros: fix compiler errors when using `mut` patterns in `select!` ([#4211]) +- sync: fix a data race between `oneshot::Sender::send` and awaiting a + `oneshot::Receiver` when the oneshot has been closed ([#4226]) +- sync: make `AtomicWaker` panic safe ([#3689]) +- runtime: fix basic scheduler dropping tasks outside a runtime context + ([#4213]) + +### Added + +- stats: add `RuntimeStats::busy_duration_total` ([#4179], [#4223]) + +### Changed + +- io: updated `copy` buffer size to match `std::io::copy` ([#4209]) + +### Documented + +- io: rename buffer to file in doc-test ([#4230]) +- sync: fix Notify example ([#4212]) + +[#4211]: https://github.com/tokio-rs/tokio/pull/4211 +[#4226]: https://github.com/tokio-rs/tokio/pull/4226 +[#3689]: https://github.com/tokio-rs/tokio/pull/3689 +[#4213]: https://github.com/tokio-rs/tokio/pull/4213 +[#4179]: https://github.com/tokio-rs/tokio/pull/4179 +[#4223]: https://github.com/tokio-rs/tokio/pull/4223 +[#4209]: https://github.com/tokio-rs/tokio/pull/4209 +[#4230]: https://github.com/tokio-rs/tokio/pull/4230 +[#4212]: https://github.com/tokio-rs/tokio/pull/4212 + +# 1.13.1 (November 15, 2021) + +### Fixed + +- sync: fix a data race between `oneshot::Sender::send` and awaiting a + `oneshot::Receiver` when the oneshot has been closed ([#4226]) + +[#4226]: https://github.com/tokio-rs/tokio/pull/4226 + +# 1.13.0 (October 29, 2021) + +### Fixed + +- sync: fix `Notify` to clone the waker before locking its waiter list ([#4129]) +- tokio: add riscv32 to non atomic64 architectures ([#4185]) + +### Added + +- net: add `poll_{recv,send}_ready` methods to `udp` and `uds_datagram` ([#4131]) +- net: add `try_*`, `readable`, `writable`, `ready`, and `peer_addr` methods to split halves ([#4120]) +- sync: add `blocking_lock` to `Mutex` ([#4130]) +- sync: add `watch::Sender::send_replace` ([#3962], [#4195]) +- sync: expand `Debug` for `Mutex<T>` impl to unsized `T` ([#4134]) +- tracing: instrument time::Sleep ([#4072]) +- tracing: use structured location fields for spawned tasks ([#4128]) + +### Changed + +- io: add assert in `copy_bidirectional` that `poll_write` is sensible ([#4125]) +- macros: use qualified syntax when polling in `select!` ([#4192]) +- runtime: handle `block_on` wakeups better ([#4157]) +- task: allocate callback on heap immediately in debug mode ([#4203]) +- tokio: assert platform-minimum requirements at build time ([#3797]) + +### Documented + +- docs: conversion of doc comments to indicative mood ([#4174]) +- docs: add returning on the first error example for `try_join!` ([#4133]) +- docs: fixing broken links in `tokio/src/lib.rs` ([#4132]) +- signal: add example with background listener ([#4171]) +- sync: add more oneshot examples ([#4153]) +- time: document `Interval::tick` cancel safety ([#4152]) + +[#3797]: https://github.com/tokio-rs/tokio/pull/3797 +[#3962]: https://github.com/tokio-rs/tokio/pull/3962 +[#4072]: https://github.com/tokio-rs/tokio/pull/4072 +[#4120]: https://github.com/tokio-rs/tokio/pull/4120 +[#4125]: https://github.com/tokio-rs/tokio/pull/4125 +[#4128]: https://github.com/tokio-rs/tokio/pull/4128 +[#4129]: https://github.com/tokio-rs/tokio/pull/4129 +[#4130]: https://github.com/tokio-rs/tokio/pull/4130 +[#4131]: https://github.com/tokio-rs/tokio/pull/4131 +[#4132]: https://github.com/tokio-rs/tokio/pull/4132 +[#4133]: https://github.com/tokio-rs/tokio/pull/4133 +[#4134]: https://github.com/tokio-rs/tokio/pull/4134 +[#4152]: https://github.com/tokio-rs/tokio/pull/4152 +[#4153]: https://github.com/tokio-rs/tokio/pull/4153 +[#4157]: https://github.com/tokio-rs/tokio/pull/4157 +[#4171]: https://github.com/tokio-rs/tokio/pull/4171 +[#4174]: https://github.com/tokio-rs/tokio/pull/4174 +[#4185]: https://github.com/tokio-rs/tokio/pull/4185 +[#4192]: https://github.com/tokio-rs/tokio/pull/4192 +[#4195]: https://github.com/tokio-rs/tokio/pull/4195 +[#4203]: https://github.com/tokio-rs/tokio/pull/4203 + # 1.12.0 (September 21, 2021) ### Fixed @@ -14,10 +113,6 @@ - runtime: callback when a worker parks and unparks ([#4070]) - sync: implement `try_recv` for mpsc channels ([#4113]) -### Changed - -- macros: run runtime inside `LocalSet` when using macro ([#4027]) - ### Documented - docs: clarify CPU-bound tasks on Tokio ([#4105]) @@ -3,21 +3,20 @@ # When uploading crates to the registry Cargo will automatically # "normalize" Cargo.toml files for maximal compatibility # with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies +# to registry (e.g., crates.io) dependencies. # -# If you believe there's an error in this file please file an -# issue against the rust-lang/cargo repository. If you're -# editing this file be aware that the upstream Cargo.toml -# will likely look very different (and much more reasonable) +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. [package] edition = "2018" name = "tokio" -version = "1.12.0" +version = "1.14.0" authors = ["Tokio Contributors <team@tokio.rs>"] description = "An event-driven, non-blocking I/O platform for writing asynchronous I/O\nbacked applications.\n" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio/1.12.0/tokio/" +documentation = "https://docs.rs/tokio/1.14.0/tokio/" readme = "README.md" keywords = ["io", "async", "non-blocking", "futures"] categories = ["asynchronous", "network-programming"] @@ -57,7 +56,7 @@ optional = true version = "0.2.0" [dependencies.tokio-macros] -version = "1.1.0" +version = "1.6.0" optional = true [dev-dependencies.async-stream] version = "0.3" @@ -112,7 +111,7 @@ features = ["futures", "checkpoint"] version = "0.6.0" features = ["tokio"] [target."cfg(tokio_unstable)".dependencies.tracing] -version = "0.1.21" +version = "0.1.25" features = ["std"] optional = true default-features = false diff --git a/Cargo.toml.orig b/Cargo.toml.orig index d2e4696..348ec46 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -7,12 +7,12 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.0.x" git tag. -version = "1.12.0" +version = "1.14.0" edition = "2018" authors = ["Tokio Contributors <team@tokio.rs>"] license = "MIT" readme = "README.md" -documentation = "https://docs.rs/tokio/1.12.0/tokio/" +documentation = "https://docs.rs/tokio/1.14.0/tokio/" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" description = """ @@ -87,7 +87,7 @@ test-util = ["rt", "sync", "time"] time = [] [dependencies] -tokio-macros = { version = "1.1.0", path = "../tokio-macros", optional = true } +tokio-macros = { version = "1.6.0", path = "../tokio-macros", optional = true } pin-project-lite = "0.2.0" @@ -102,7 +102,7 @@ parking_lot = { version = "0.11.0", optional = true } # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. [target.'cfg(tokio_unstable)'.dependencies] -tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full +tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } # Not in full [target.'cfg(unix)'.dependencies] libc = { version = "0.2.42", optional = true } @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio/tokio-1.12.0.crate" + value: "https://static.crates.io/crates/tokio/tokio-1.14.0.crate" } - version: "1.12.0" + version: "1.14.0" license_type: NOTICE last_upgrade_date { year: 2021 - month: 9 - day: 30 + month: 11 + day: 16 } } @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.12.0", features = ["full"] } +tokio = { version = "1.14.0", features = ["full"] } ``` Then, on your main.rs: diff --git a/patches/test_fix.patch b/patches/test_fix.patch deleted file mode 100644 index efc8c27..0000000 --- a/patches/test_fix.patch +++ /dev/null @@ -1,21 +0,0 @@ -diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs -index a70f49b81a..f8a35d0ede 100644 ---- a/tests/task_local_set.rs -+++ b/tests/task_local_set.rs -@@ -16,16 +16,6 @@ use std::sync::atomic::Ordering::{self, SeqCst}; - use std::sync::atomic::{AtomicBool, AtomicUsize}; - use std::time::Duration; - --#[tokio::test(flavor = "current_thread")] --async fn localset_implicit_current_thread() { -- task::spawn_local(async {}).await.unwrap(); --} -- --#[tokio::test(flavor = "multi_thread")] --async fn localset_implicit_multi_thread() { -- task::spawn_local(async {}).await.unwrap(); --} -- - #[tokio::test(flavor = "current_thread")] - async fn local_basic_scheduler() { - LocalSet::new()
\ No newline at end of file diff --git a/src/coop.rs b/src/coop.rs index 16d93fb..256e962 100644 --- a/src/coop.rs +++ b/src/coop.rs @@ -69,14 +69,14 @@ cfg_rt_multi_thread! { } } -/// Run the given closure with a cooperative task budget. When the function +/// Runs the given closure with a cooperative task budget. When the function /// returns, the budget is reset to the value prior to calling the function. #[inline(always)] pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R { with_budget(Budget::initial(), f) } -/// Run the given closure with an unconstrained task budget. When the function returns, the budget +/// Runs the given closure with an unconstrained task budget. When the function returns, the budget /// is reset to the value prior to calling the function. #[inline(always)] pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R { @@ -108,7 +108,7 @@ fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R { } cfg_rt_multi_thread! { - /// Set the current task's budget + /// Sets the current task's budget. pub(crate) fn set(budget: Budget) { CURRENT.with(|cell| cell.set(budget)) } @@ -120,7 +120,7 @@ cfg_rt_multi_thread! { } cfg_rt! { - /// Forcibly remove the budgeting constraints early. + /// Forcibly removes the budgeting constraints early. /// /// Returns the remaining budget pub(crate) fn stop() -> Budget { @@ -186,7 +186,7 @@ cfg_coop! { } impl Budget { - /// Decrement the budget. Returns `true` if successful. Decrementing fails + /// Decrements the budget. Returns `true` if successful. Decrementing fails /// when there is not enough remaining budget. fn decrement(&mut self) -> bool { if let Some(num) = &mut self.0 { diff --git a/src/fs/create_dir.rs b/src/fs/create_dir.rs index e03b04d..4119695 100644 --- a/src/fs/create_dir.rs +++ b/src/fs/create_dir.rs @@ -3,7 +3,7 @@ use crate::fs::asyncify; use std::io; use std::path::Path; -/// Creates a new, empty directory at the provided path +/// Creates a new, empty directory at the provided path. /// /// This is an async version of [`std::fs::create_dir`][std] /// diff --git a/src/fs/dir_builder.rs b/src/fs/dir_builder.rs index b184934..97168bf 100644 --- a/src/fs/dir_builder.rs +++ b/src/fs/dir_builder.rs @@ -14,7 +14,7 @@ pub struct DirBuilder { /// Indicates whether to create parent directories if they are missing. recursive: bool, - /// Set the Unix mode for newly created directories. + /// Sets the Unix mode for newly created directories. #[cfg(unix)] pub(super) mode: Option<u32>, } diff --git a/src/fs/file.rs b/src/fs/file.rs index 5286e6c..61071cf 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -74,7 +74,7 @@ use std::fs::File as StdFile; /// # } /// ``` /// -/// Read the contents of a file into a buffer +/// Read the contents of a file into a buffer: /// /// ```no_run /// use tokio::fs::File; @@ -383,7 +383,7 @@ impl File { asyncify(move || std.metadata()).await } - /// Create a new `File` instance that shares the same underlying file handle + /// Creates a new `File` instance that shares the same underlying file handle /// as the existing `File` instance. Reads, writes, and seeks will affect both /// File instances simultaneously. /// diff --git a/src/fs/open_options.rs b/src/fs/open_options.rs index 3e73529..f3b4654 100644 --- a/src/fs/open_options.rs +++ b/src/fs/open_options.rs @@ -430,7 +430,7 @@ feature! { self } - /// Pass custom flags to the `flags` argument of `open`. + /// Passes custom flags to the `flags` argument of `open`. /// /// The bits that define the access mode are masked out with `O_ACCMODE`, to /// ensure they do not interfere with the access mode set by Rusts options. diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs index 514d59c..281ea4c 100644 --- a/src/fs/read_dir.rs +++ b/src/fs/read_dir.rs @@ -34,7 +34,7 @@ pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> { Ok(ReadDir(State::Idle(Some(std)))) } -/// Read the the entries in a directory. +/// Reads the the entries in a directory. /// /// This struct is returned from the [`read_dir`] function of this module and /// will yield instances of [`DirEntry`]. Through a [`DirEntry`] information @@ -287,7 +287,7 @@ impl DirEntry { asyncify(move || std.file_type()).await } - /// Returns a reference to the underlying `std::fs::DirEntry` + /// Returns a reference to the underlying `std::fs::DirEntry`. #[cfg(unix)] pub(super) fn as_inner(&self) -> &std::fs::DirEntry { &self.0 diff --git a/src/future/maybe_done.rs b/src/future/maybe_done.rs index 1e083ad..486efbe 100644 --- a/src/future/maybe_done.rs +++ b/src/future/maybe_done.rs @@ -1,4 +1,4 @@ -//! Definition of the MaybeDone combinator +//! Definition of the MaybeDone combinator. use std::future::Future; use std::mem; @@ -8,9 +8,9 @@ use std::task::{Context, Poll}; /// A future that may have completed. #[derive(Debug)] pub enum MaybeDone<Fut: Future> { - /// A not-yet-completed future + /// A not-yet-completed future. Future(Fut), - /// The output of the completed future + /// The output of the completed future. Done(Fut::Output), /// The empty variant after the result of a [`MaybeDone`] has been /// taken using the [`take_output`](MaybeDone::take_output) method. @@ -20,7 +20,7 @@ pub enum MaybeDone<Fut: Future> { // Safe because we never generate `Pin<&mut Fut::Output>` impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {} -/// Wraps a future into a `MaybeDone` +/// Wraps a future into a `MaybeDone`. pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> { MaybeDone::Future(future) } diff --git a/src/future/poll_fn.rs b/src/future/poll_fn.rs index 0169bd5..d82ce89 100644 --- a/src/future/poll_fn.rs +++ b/src/future/poll_fn.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] -//! Definition of the `PollFn` adapter combinator +//! Definition of the `PollFn` adapter combinator. use std::fmt; use std::future::Future; diff --git a/src/io/async_fd.rs b/src/io/async_fd.rs index fa5bec5..9ec5b7f 100644 --- a/src/io/async_fd.rs +++ b/src/io/async_fd.rs @@ -205,13 +205,13 @@ impl<T: AsRawFd> AsyncFd<T> { }) } - /// Returns a shared reference to the backing object of this [`AsyncFd`] + /// Returns a shared reference to the backing object of this [`AsyncFd`]. #[inline] pub fn get_ref(&self) -> &T { self.inner.as_ref().unwrap() } - /// Returns a mutable reference to the backing object of this [`AsyncFd`] + /// Returns a mutable reference to the backing object of this [`AsyncFd`]. #[inline] pub fn get_mut(&mut self) -> &mut T { self.inner.as_mut().unwrap() diff --git a/src/io/blocking.rs b/src/io/blocking.rs index 94a3484..1d79ee7 100644 --- a/src/io/blocking.rs +++ b/src/io/blocking.rs @@ -16,7 +16,7 @@ use self::State::*; pub(crate) struct Blocking<T> { inner: Option<T>, state: State<T>, - /// `true` if the lower IO layer needs flushing + /// `true` if the lower IO layer needs flushing. need_flush: bool, } @@ -175,7 +175,7 @@ where } } -/// Repeats operations that are interrupted +/// Repeats operations that are interrupted. macro_rules! uninterruptibly { ($e:expr) => {{ loop { diff --git a/src/io/bsd/poll_aio.rs b/src/io/bsd/poll_aio.rs index a765d76..f1ac4b2 100644 --- a/src/io/bsd/poll_aio.rs +++ b/src/io/bsd/poll_aio.rs @@ -1,4 +1,4 @@ -//! Use POSIX AIO futures with Tokio +//! Use POSIX AIO futures with Tokio. use crate::io::driver::{Handle, Interest, ReadyEvent, Registration}; use mio::event::Source; @@ -16,14 +16,14 @@ use std::task::{Context, Poll}; /// Tokio's consumer must pass an implementor of this trait to create a /// [`Aio`] object. pub trait AioSource { - /// Register this AIO event source with Tokio's reactor + /// Registers this AIO event source with Tokio's reactor. fn register(&mut self, kq: RawFd, token: usize); - /// Deregister this AIO event source with Tokio's reactor + /// Deregisters this AIO event source with Tokio's reactor. fn deregister(&mut self); } -/// Wrap the user's AioSource in order to implement mio::event::Source, which +/// Wraps the user's AioSource in order to implement mio::event::Source, which /// is what the rest of the crate wants. struct MioSource<T>(T); diff --git a/src/io/driver/interest.rs b/src/io/driver/interest.rs index c5b18ed..d6b46df 100644 --- a/src/io/driver/interest.rs +++ b/src/io/driver/interest.rs @@ -5,7 +5,7 @@ use crate::io::driver::Ready; use std::fmt; use std::ops; -/// Readiness event interest +/// Readiness event interest. /// /// Specifies the readiness events the caller is interested in when awaiting on /// I/O resource readiness states. @@ -17,19 +17,19 @@ impl Interest { // The non-FreeBSD definitions in this block are active only when // building documentation. cfg_aio! { - /// Interest for POSIX AIO + /// Interest for POSIX AIO. #[cfg(target_os = "freebsd")] pub const AIO: Interest = Interest(mio::Interest::AIO); - /// Interest for POSIX AIO + /// Interest for POSIX AIO. #[cfg(not(target_os = "freebsd"))] pub const AIO: Interest = Interest(mio::Interest::READABLE); - /// Interest for POSIX AIO lio_listio events + /// Interest for POSIX AIO lio_listio events. #[cfg(target_os = "freebsd")] pub const LIO: Interest = Interest(mio::Interest::LIO); - /// Interest for POSIX AIO lio_listio events + /// Interest for POSIX AIO lio_listio events. #[cfg(not(target_os = "freebsd"))] pub const LIO: Interest = Interest(mio::Interest::READABLE); } @@ -39,7 +39,7 @@ impl Interest { /// Readable interest includes read-closed events. pub const READABLE: Interest = Interest(mio::Interest::READABLE); - /// Interest in all writable events + /// Interest in all writable events. /// /// Writable interest includes write-closed events. pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); diff --git a/src/io/driver/mod.rs b/src/io/driver/mod.rs index 1511884..19f67a2 100644 --- a/src/io/driver/mod.rs +++ b/src/io/driver/mod.rs @@ -23,10 +23,10 @@ use std::io; use std::sync::{Arc, Weak}; use std::time::Duration; -/// I/O driver, backed by Mio +/// I/O driver, backed by Mio. pub(crate) struct Driver { /// Tracks the number of times `turn` is called. It is safe for this to wrap - /// as it is mostly used to determine when to call `compact()` + /// as it is mostly used to determine when to call `compact()`. tick: u8, /// Reuse the `mio::Events` value across calls to poll. @@ -35,17 +35,17 @@ pub(crate) struct Driver { /// Primary slab handle containing the state for each resource registered /// with this driver. During Drop this is moved into the Inner structure, so /// this is an Option to allow it to be vacated (until Drop this is always - /// Some) + /// Some). resources: Option<Slab<ScheduledIo>>, - /// The system event queue + /// The system event queue. poll: mio::Poll, /// State shared between the reactor and the handles. inner: Arc<Inner>, } -/// A reference to an I/O driver +/// A reference to an I/O driver. #[derive(Clone)] pub(crate) struct Handle { inner: Weak<Inner>, @@ -66,13 +66,13 @@ pub(super) struct Inner { /// without risking new ones being registered in the meantime. resources: Mutex<Option<Slab<ScheduledIo>>>, - /// Registers I/O resources + /// Registers I/O resources. registry: mio::Registry, /// Allocates `ScheduledIo` handles when creating new resources. pub(super) io_dispatch: slab::Allocator<ScheduledIo>, - /// Used to wake up the reactor from a call to `turn` + /// Used to wake up the reactor from a call to `turn`. waker: mio::Waker, } @@ -253,7 +253,7 @@ impl fmt::Debug for Driver { cfg_rt! { impl Handle { - /// Returns a handle to the current reactor + /// Returns a handle to the current reactor. /// /// # Panics /// @@ -267,7 +267,7 @@ cfg_rt! { cfg_not_rt! { impl Handle { - /// Returns a handle to the current reactor + /// Returns a handle to the current reactor. /// /// # Panics /// diff --git a/src/io/driver/ready.rs b/src/io/driver/ready.rs index 305dc91..2430d30 100644 --- a/src/io/driver/ready.rs +++ b/src/io/driver/ready.rs @@ -68,7 +68,7 @@ impl Ready { ready } - /// Returns true if `Ready` is the empty set + /// Returns true if `Ready` is the empty set. /// /// # Examples /// @@ -82,7 +82,7 @@ impl Ready { self == Ready::EMPTY } - /// Returns `true` if the value includes `readable` + /// Returns `true` if the value includes `readable`. /// /// # Examples /// @@ -98,7 +98,7 @@ impl Ready { self.contains(Ready::READABLE) || self.is_read_closed() } - /// Returns `true` if the value includes writable `readiness` + /// Returns `true` if the value includes writable `readiness`. /// /// # Examples /// @@ -114,7 +114,7 @@ impl Ready { self.contains(Ready::WRITABLE) || self.is_write_closed() } - /// Returns `true` if the value includes read-closed `readiness` + /// Returns `true` if the value includes read-closed `readiness`. /// /// # Examples /// @@ -129,7 +129,7 @@ impl Ready { self.contains(Ready::READ_CLOSED) } - /// Returns `true` if the value includes write-closed `readiness` + /// Returns `true` if the value includes write-closed `readiness`. /// /// # Examples /// @@ -154,7 +154,7 @@ impl Ready { (self & other) == other } - /// Create a `Ready` instance using the given `usize` representation. + /// Creates a `Ready` instance using the given `usize` representation. /// /// The `usize` representation must have been obtained from a call to /// `Readiness::as_usize`. diff --git a/src/io/driver/scheduled_io.rs b/src/io/driver/scheduled_io.rs index a265720..76f9343 100644 --- a/src/io/driver/scheduled_io.rs +++ b/src/io/driver/scheduled_io.rs @@ -36,16 +36,16 @@ cfg_io_readiness! { #[derive(Debug, Default)] struct Waiters { #[cfg(feature = "net")] - /// List of all current waiters + /// List of all current waiters. list: WaitList, - /// Waker used for AsyncRead + /// Waker used for AsyncRead. reader: Option<Waker>, - /// Waker used for AsyncWrite + /// Waker used for AsyncWrite. writer: Option<Waker>, - /// True if this ScheduledIo has been killed due to IO driver shutdown + /// True if this ScheduledIo has been killed due to IO driver shutdown. is_shutdown: bool, } @@ -54,19 +54,19 @@ cfg_io_readiness! { struct Waiter { pointers: linked_list::Pointers<Waiter>, - /// The waker for this task + /// The waker for this task. waker: Option<Waker>, - /// The interest this waiter is waiting on + /// The interest this waiter is waiting on. interest: Interest, is_ready: bool, - /// Should never be `!Unpin` + /// Should never be `!Unpin`. _p: PhantomPinned, } - /// Future returned by `readiness()` + /// Future returned by `readiness()`. struct Readiness<'a> { scheduled_io: &'a ScheduledIo, @@ -276,7 +276,7 @@ impl ScheduledIo { } } - /// Poll version of checking readiness for a certain direction. + /// Polls for readiness events in a given direction. /// /// These are to support `AsyncRead` and `AsyncWrite` polling methods, /// which cannot use the `async fn` version. This uses reserved reader @@ -363,7 +363,7 @@ unsafe impl Sync for ScheduledIo {} cfg_io_readiness! { impl ScheduledIo { - /// An async version of `poll_readiness` which uses a linked list of wakers + /// An async version of `poll_readiness` which uses a linked list of wakers. pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { self.readiness_fut(interest).await } diff --git a/src/io/mod.rs b/src/io/mod.rs index a5ee108..cfdda61 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -218,7 +218,7 @@ cfg_io_driver_impl! { } cfg_aio! { - /// BSD-specific I/O types + /// BSD-specific I/O types. pub mod bsd { mod poll_aio; diff --git a/src/io/poll_evented.rs b/src/io/poll_evented.rs index 9872574..44e68a2 100644 --- a/src/io/poll_evented.rs +++ b/src/io/poll_evented.rs @@ -113,7 +113,7 @@ impl<E: Source> PollEvented<E> { }) } - /// Returns a reference to the registration + /// Returns a reference to the registration. #[cfg(any( feature = "net", all(unix, feature = "process"), @@ -123,7 +123,7 @@ impl<E: Source> PollEvented<E> { &self.registration } - /// Deregister the inner io from the registration and returns a Result containing the inner io + /// Deregisters the inner io from the registration and returns a Result containing the inner io. #[cfg(any(feature = "net", feature = "process"))] pub(crate) fn into_inner(mut self) -> io::Result<E> { let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. diff --git a/src/io/split.rs b/src/io/split.rs index f35273f..8258a0f 100644 --- a/src/io/split.rs +++ b/src/io/split.rs @@ -90,7 +90,7 @@ impl<T> ReadHalf<T> { } impl<T> WriteHalf<T> { - /// Check if this `WriteHalf` and some `ReadHalf` were split from the same + /// Checks if this `WriteHalf` and some `ReadHalf` were split from the same /// stream. pub fn is_pair_of(&self, other: &ReadHalf<T>) -> bool { Arc::ptr_eq(&self.inner, &other.inner) diff --git a/src/io/stdio_common.rs b/src/io/stdio_common.rs index 56c4520..7e4a198 100644 --- a/src/io/stdio_common.rs +++ b/src/io/stdio_common.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; /// if buffer contents seems to be utf8. Otherwise it only trims buffer down to MAX_BUF. /// That's why, wrapped writer will always receive well-formed utf-8 bytes. /// # Other platforms -/// passes data to `inner` as is +/// Passes data to `inner` as is. #[derive(Debug)] pub(crate) struct SplitByUtf8BoundaryIfWindows<W> { inner: W, diff --git a/src/io/util/async_write_ext.rs b/src/io/util/async_write_ext.rs index a1f77f8..93a3183 100644 --- a/src/io/util/async_write_ext.rs +++ b/src/io/util/async_write_ext.rs @@ -20,7 +20,7 @@ use std::io::IoSlice; use bytes::Buf; cfg_io_util! { - /// Defines numeric writer + /// Defines numeric writer. macro_rules! write_impl { ( $( @@ -256,7 +256,7 @@ cfg_io_util! { write_buf(self, src) } - /// Attempts to write an entire buffer into this writer + /// Attempts to write an entire buffer into this writer. /// /// Equivalent to: /// @@ -353,9 +353,9 @@ cfg_io_util! { /// /// #[tokio::main] /// async fn main() -> io::Result<()> { - /// let mut buffer = File::create("foo.txt").await?; + /// let mut file = File::create("foo.txt").await?; /// - /// buffer.write_all(b"some bytes").await?; + /// file.write_all(b"some bytes").await?; /// Ok(()) /// } /// ``` diff --git a/src/io/util/buf_reader.rs b/src/io/util/buf_reader.rs index 7cfd46c..7df610b 100644 --- a/src/io/util/buf_reader.rs +++ b/src/io/util/buf_reader.rs @@ -155,7 +155,7 @@ pub(super) enum SeekState { Pending, } -/// Seek to an offset, in bytes, in the underlying reader. +/// Seeks to an offset, in bytes, in the underlying reader. /// /// The position used for seeking with `SeekFrom::Current(_)` is the /// position the underlying reader would be at if the `BufReader` had no diff --git a/src/io/util/copy.rs b/src/io/util/copy.rs index fbd77b5..d0ab7cb 100644 --- a/src/io/util/copy.rs +++ b/src/io/util/copy.rs @@ -23,7 +23,7 @@ impl CopyBuffer { pos: 0, cap: 0, amt: 0, - buf: vec![0; 2048].into_boxed_slice(), + buf: vec![0; super::DEFAULT_BUF_SIZE].into_boxed_slice(), } } @@ -84,6 +84,14 @@ impl CopyBuffer { } } + // If pos larger than cap, this loop will never stop. + // In particular, user's wrong poll_write implementation returning + // incorrect written length may lead to thread blocking. + debug_assert!( + self.pos <= self.cap, + "writer returned length larger than input slice" + ); + // If we've written all the data and we've seen EOF, flush out the // data and finish the transfer. if self.pos == self.cap && self.read_done { diff --git a/src/io/util/lines.rs b/src/io/util/lines.rs index 3fbf5e3..717f633 100644 --- a/src/io/util/lines.rs +++ b/src/io/util/lines.rs @@ -8,7 +8,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { - /// Read lines from an [`AsyncBufRead`]. + /// Reads lines from an [`AsyncBufRead`]. /// /// A `Lines` can be turned into a `Stream` with [`LinesStream`]. /// @@ -72,12 +72,12 @@ where poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await } - /// Obtain a mutable reference to the underlying reader + /// Obtains a mutable reference to the underlying reader. pub fn get_mut(&mut self) -> &mut R { &mut self.reader } - /// Obtain a reference to the underlying reader + /// Obtains a reference to the underlying reader. pub fn get_ref(&mut self) -> &R { &self.reader } @@ -16,6 +16,10 @@ attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] #![cfg_attr(docsrs, feature(doc_cfg))] +#![cfg_attr(docsrs, feature(doc_cfg_hide))] +#![cfg_attr(docsrs, doc(cfg_hide(docsrs)))] +#![cfg_attr(docsrs, doc(cfg_hide(loom)))] +#![cfg_attr(docsrs, doc(cfg_hide(not(loom))))] #![cfg_attr(docsrs, allow(unused_attributes))] //! A runtime for writing reliable network applications without compromising speed. @@ -161,8 +165,8 @@ //! [`tokio::runtime`]: crate::runtime //! [`Builder`]: crate::runtime::Builder //! [`Runtime`]: crate::runtime::Runtime -//! [rt]: runtime/index.html#basic-scheduler -//! [rt-multi-thread]: runtime/index.html#threaded-scheduler +//! [rt]: runtime/index.html#current-thread-scheduler +//! [rt-multi-thread]: runtime/index.html#multi-thread-scheduler //! [rt-features]: runtime/index.html#runtime-scheduler //! //! ## CPU-bound tasks and blocking code @@ -350,6 +354,19 @@ //! //! [feature flags]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-section +// Test that pointer width is compatible. This asserts that e.g. usize is at +// least 32 bits, which a lot of components in Tokio currently assumes. +// +// TODO: improve once we have MSRV access to const eval to make more flexible. +#[cfg(not(any( + target_pointer_width = "32", + target_pointer_width = "64", + target_pointer_width = "128" +)))] +compile_error! { + "Tokio requires the platform pointer width to be 32, 64, or 128 bits" +} + // Includes re-exports used by macros. // // This module is not intended to be part of the public API. In general, any @@ -480,6 +497,12 @@ cfg_macros! { #[doc(hidden)] pub use tokio_macros::select_priv_declare_output_enum; + /// Implementation detail of the `select!` macro. This macro is **not** + /// intended to be used as part of the public API and is permitted to + /// change. + #[doc(hidden)] + pub use tokio_macros::select_priv_clean_pattern; + cfg_rt! { #[cfg(feature = "rt-multi-thread")] #[cfg(not(test))] // Work around for rust-lang/rust#62127 diff --git a/src/loom/std/mutex.rs b/src/loom/std/mutex.rs index bf14d62..3f686e0 100644 --- a/src/loom/std/mutex.rs +++ b/src/loom/std/mutex.rs @@ -1,7 +1,7 @@ use std::sync::{self, MutexGuard, TryLockError}; /// Adapter for `std::Mutex` that removes the poisoning aspects -// from its api +/// from its api. #[derive(Debug)] pub(crate) struct Mutex<T: ?Sized>(sync::Mutex<T>); diff --git a/src/macros/cfg.rs b/src/macros/cfg.rs index 193bcd7..606bce7 100644 --- a/src/macros/cfg.rs +++ b/src/macros/cfg.rs @@ -13,7 +13,7 @@ macro_rules! feature { } } -/// Enables enter::block_on +/// Enables enter::block_on. macro_rules! cfg_block_on { ($($item:item)*) => { $( @@ -28,7 +28,7 @@ macro_rules! cfg_block_on { } } -/// Enables internal `AtomicWaker` impl +/// Enables internal `AtomicWaker` impl. macro_rules! cfg_atomic_waker_impl { ($($item:item)*) => { $( @@ -99,6 +99,7 @@ macro_rules! cfg_io_driver_impl { feature = "process", all(unix, feature = "signal"), ))] + #[cfg_attr(docsrs, doc(cfg(all())))] $item )* } @@ -422,7 +423,8 @@ macro_rules! cfg_has_atomic_u64 { #[cfg(not(any( target_arch = "arm", target_arch = "mips", - target_arch = "powerpc" + target_arch = "powerpc", + target_arch = "riscv32" )))] $item )* @@ -435,7 +437,8 @@ macro_rules! cfg_not_has_atomic_u64 { #[cfg(any( target_arch = "arm", target_arch = "mips", - target_arch = "powerpc" + target_arch = "powerpc", + target_arch = "riscv32" ))] $item )* diff --git a/src/macros/join.rs b/src/macros/join.rs index 5f37af5..f91b5f1 100644 --- a/src/macros/join.rs +++ b/src/macros/join.rs @@ -1,4 +1,4 @@ -/// Wait on multiple concurrent branches, returning when **all** branches +/// Waits on multiple concurrent branches, returning when **all** branches /// complete. /// /// The `join!` macro must be used inside of async functions, closures, and diff --git a/src/macros/mod.rs b/src/macros/mod.rs index b0af521..a1839c8 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -15,6 +15,11 @@ mod ready; #[macro_use] mod thread_local; +cfg_trace! { + #[macro_use] + mod trace; +} + #[macro_use] #[cfg(feature = "rt")] pub(crate) mod scoped_tls; diff --git a/src/macros/scoped_tls.rs b/src/macros/scoped_tls.rs index a00aae2..f2504cb 100644 --- a/src/macros/scoped_tls.rs +++ b/src/macros/scoped_tls.rs @@ -3,7 +3,7 @@ use crate::loom::thread::LocalKey; use std::cell::Cell; use std::marker; -/// Set a reference as a thread-local +/// Sets a reference as a thread-local. macro_rules! scoped_thread_local { ($(#[$attrs:meta])* $vis:vis static $name:ident: $ty:ty) => ( $(#[$attrs])* diff --git a/src/macros/select.rs b/src/macros/select.rs index a90ee9e..051f8cb 100644 --- a/src/macros/select.rs +++ b/src/macros/select.rs @@ -1,4 +1,4 @@ -/// Wait on multiple concurrent branches, returning when the **first** branch +/// Waits on multiple concurrent branches, returning when the **first** branch /// completes, cancelling the remaining branches. /// /// The `select!` macro must be used inside of async functions, closures, and @@ -502,7 +502,7 @@ macro_rules! select { let mut fut = unsafe { Pin::new_unchecked(fut) }; // Try polling it - let out = match fut.poll(cx) { + let out = match Future::poll(fut, cx) { Ready(out) => out, Pending => { // Track that at least one future is @@ -520,7 +520,7 @@ macro_rules! select { #[allow(unused_variables)] #[allow(unused_mut)] match &out { - $bind => {} + $crate::select_priv_clean_pattern!($bind) => {} _ => continue, } diff --git a/src/macros/trace.rs b/src/macros/trace.rs new file mode 100644 index 0000000..31dde2f --- /dev/null +++ b/src/macros/trace.rs @@ -0,0 +1,27 @@ +cfg_trace! { + macro_rules! trace_op { + ($name:literal, $readiness:literal, $parent:expr) => { + tracing::trace!( + target: "runtime::resource::poll_op", + parent: $parent, + op_name = $name, + is_ready = $readiness + ); + } + } + + macro_rules! trace_poll_op { + ($name:literal, $poll:expr, $parent:expr $(,)*) => { + match $poll { + std::task::Poll::Ready(t) => { + trace_op!($name, true, $parent); + std::task::Poll::Ready(t) + } + std::task::Poll::Pending => { + trace_op!($name, false, $parent); + return std::task::Poll::Pending; + } + } + }; + } +} diff --git a/src/macros/try_join.rs b/src/macros/try_join.rs index fa5850e..6d3a893 100644 --- a/src/macros/try_join.rs +++ b/src/macros/try_join.rs @@ -1,4 +1,4 @@ -/// Wait on multiple concurrent branches, returning when **all** branches +/// Waits on multiple concurrent branches, returning when **all** branches /// complete with `Ok(_)` or on the first `Err(_)`. /// /// The `try_join!` macro must be used inside of async functions, closures, and @@ -59,6 +59,45 @@ /// } /// } /// ``` +/// +/// Using `try_join!` with spawned tasks. +/// +/// ``` +/// use tokio::task::JoinHandle; +/// +/// async fn do_stuff_async() -> Result<(), &'static str> { +/// // async work +/// # Err("failed") +/// } +/// +/// async fn more_async_work() -> Result<(), &'static str> { +/// // more here +/// # Ok(()) +/// } +/// +/// async fn flatten<T>(handle: JoinHandle<Result<T, &'static str>>) -> Result<T, &'static str> { +/// match handle.await { +/// Ok(Ok(result)) => Ok(result), +/// Ok(Err(err)) => Err(err), +/// Err(err) => Err("handling failed"), +/// } +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let handle1 = tokio::spawn(do_stuff_async()); +/// let handle2 = tokio::spawn(more_async_work()); +/// match tokio::try_join!(flatten(handle1), flatten(handle2)) { +/// Ok(val) => { +/// // do something with the values +/// } +/// Err(err) => { +/// println!("Failed with {}.", err); +/// # assert_eq!(err, "failed"); +/// } +/// } +/// } +/// ``` #[macro_export] #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] macro_rules! try_join { diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 86f0ec1..8aecb21 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -227,7 +227,7 @@ impl TcpListener { Ok(TcpListener { io }) } - /// Turn a [`tokio::net::TcpListener`] into a [`std::net::TcpListener`]. + /// Turns a [`tokio::net::TcpListener`] into a [`std::net::TcpListener`]. /// /// The returned [`std::net::TcpListener`] will have nonblocking mode set as /// `true`. Use [`set_nonblocking`] to change the blocking mode if needed. diff --git a/src/net/tcp/mod.rs b/src/net/tcp/mod.rs index 7f0f6d9..cb8a8b2 100644 --- a/src/net/tcp/mod.rs +++ b/src/net/tcp/mod.rs @@ -1,4 +1,4 @@ -//! TCP utility types +//! TCP utility types. pub(crate) mod listener; diff --git a/src/net/tcp/socket.rs b/src/net/tcp/socket.rs index 02cb637..f54ff95 100644 --- a/src/net/tcp/socket.rs +++ b/src/net/tcp/socket.rs @@ -87,7 +87,7 @@ cfg_net! { } impl TcpSocket { - /// Create a new socket configured for IPv4. + /// Creates a new socket configured for IPv4. /// /// Calls `socket(2)` with `AF_INET` and `SOCK_STREAM`. /// @@ -121,7 +121,7 @@ impl TcpSocket { Ok(TcpSocket { inner }) } - /// Create a new socket configured for IPv6. + /// Creates a new socket configured for IPv6. /// /// Calls `socket(2)` with `AF_INET6` and `SOCK_STREAM`. /// @@ -155,7 +155,7 @@ impl TcpSocket { Ok(TcpSocket { inner }) } - /// Allow the socket to bind to an in-use address. + /// Allows the socket to bind to an in-use address. /// /// Behavior is platform specific. Refer to the target platform's /// documentation for more details. @@ -185,7 +185,7 @@ impl TcpSocket { self.inner.set_reuseaddr(reuseaddr) } - /// Retrieves the value set for `SO_REUSEADDR` on this socket + /// Retrieves the value set for `SO_REUSEADDR` on this socket. /// /// # Examples /// @@ -211,7 +211,7 @@ impl TcpSocket { self.inner.get_reuseaddr() } - /// Allow the socket to bind to an in-use port. Only available for unix systems + /// Allows the socket to bind to an in-use port. Only available for unix systems /// (excluding Solaris & Illumos). /// /// Behavior is platform specific. Refer to the target platform's @@ -245,7 +245,7 @@ impl TcpSocket { self.inner.set_reuseport(reuseport) } - /// Allow the socket to bind to an in-use port. Only available for unix systems + /// Allows the socket to bind to an in-use port. Only available for unix systems /// (excluding Solaris & Illumos). /// /// Behavior is platform specific. Refer to the target platform's @@ -348,7 +348,7 @@ impl TcpSocket { self.inner.get_recv_buffer_size() } - /// Get the local address of this socket. + /// Gets the local address of this socket. /// /// Will fail on windows if called before `bind`. /// @@ -374,7 +374,7 @@ impl TcpSocket { self.inner.get_localaddr() } - /// Bind the socket to the given address. + /// Binds the socket to the given address. /// /// This calls the `bind(2)` operating-system function. Behavior is /// platform specific. Refer to the target platform's documentation for more @@ -406,7 +406,7 @@ impl TcpSocket { self.inner.bind(addr) } - /// Establish a TCP connection with a peer at the specified socket address. + /// Establishes a TCP connection with a peer at the specified socket address. /// /// The `TcpSocket` is consumed. Once the connection is established, a /// connected [`TcpStream`] is returned. If the connection fails, the @@ -443,7 +443,7 @@ impl TcpSocket { TcpStream::connect_mio(mio).await } - /// Convert the socket into a `TcpListener`. + /// Converts the socket into a `TcpListener`. /// /// `backlog` defines the maximum number of pending connections are queued /// by the operating system at any given time. Connection are removed from diff --git a/src/net/tcp/split.rs b/src/net/tcp/split.rs index 8ae70ce..0e02928 100644 --- a/src/net/tcp/split.rs +++ b/src/net/tcp/split.rs @@ -9,14 +9,18 @@ //! level. use crate::future::poll_fn; -use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready}; use crate::net::TcpStream; use std::io; -use std::net::Shutdown; +use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; use std::task::{Context, Poll}; +cfg_io_util! { + use bytes::BufMut; +} + /// Borrowed read half of a [`TcpStream`], created by [`split`]. /// /// Reading from a `ReadHalf` is usually done using the convenience methods found on the @@ -49,7 +53,7 @@ pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) { } impl ReadHalf<'_> { - /// Attempt to receive data on the socket, without removing that data from + /// Attempts to receive data on the socket, without removing that data from /// the queue, registering the current task for wakeup if data is not yet /// available. /// @@ -134,6 +138,211 @@ impl ReadHalf<'_> { let mut buf = ReadBuf::new(buf); poll_fn(|cx| self.poll_peek(cx, &mut buf)).await } + + /// Waits for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// This function is equivalent to [`TcpStream::ready`]. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.0.ready(interest).await + } + + /// Waits for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_read()`. + /// + /// This function is also equivalent to [`TcpStream::ready`]. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn readable(&self) -> io::Result<()> { + self.0.readable().await + } + + /// Tries to read data from the stream into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { + self.0.try_read(buf) + } + + /// Tries to read data from the stream into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: Self::try_read() + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { + self.0.try_read_vectored(bufs) + } + + cfg_io_util! { + /// Tries to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.0.try_read_buf(buf) + } + } + + /// Returns the remote address that this stream is connected to. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.0.peer_addr() + } + + /// Returns the local address that this stream is bound to. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.0.local_addr() + } +} + +impl WriteHalf<'_> { + /// Waits for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// This function is equivalent to [`TcpStream::ready`]. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.0.ready(interest).await + } + + /// Waits for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with `try_write()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn writable(&self) -> io::Result<()> { + self.0.writable().await + } + + /// Tries to write a buffer to the stream, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is usually paired with `writable()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { + self.0.try_write(buf) + } + + /// Tries to write several buffers to the stream, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: Self::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> { + self.0.try_write_vectored(bufs) + } + + /// Returns the remote address that this stream is connected to. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.0.peer_addr() + } + + /// Returns the local address that this stream is bound to. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.0.local_addr() + } } impl AsyncRead for ReadHalf<'_> { diff --git a/src/net/tcp/split_owned.rs b/src/net/tcp/split_owned.rs index 1bcb4f2..ef4e7b5 100644 --- a/src/net/tcp/split_owned.rs +++ b/src/net/tcp/split_owned.rs @@ -9,16 +9,20 @@ //! level. use crate::future::poll_fn; -use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready}; use crate::net::TcpStream; use std::error::Error; -use std::net::Shutdown; +use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::{fmt, io}; +cfg_io_util! { + use bytes::BufMut; +} + /// Owned read half of a [`TcpStream`], created by [`into_split`]. /// /// Reading from an `OwnedReadHalf` is usually done using the convenience methods found @@ -189,6 +193,128 @@ impl OwnedReadHalf { let mut buf = ReadBuf::new(buf); poll_fn(|cx| self.poll_peek(cx, &mut buf)).await } + + /// Waits for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// This function is equivalent to [`TcpStream::ready`]. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.inner.ready(interest).await + } + + /// Waits for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_read()`. + /// + /// This function is also equivalent to [`TcpStream::ready`]. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn readable(&self) -> io::Result<()> { + self.inner.readable().await + } + + /// Tries to read data from the stream into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.try_read(buf) + } + + /// Tries to read data from the stream into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: Self::try_read() + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { + self.inner.try_read_vectored(bufs) + } + + cfg_io_util! { + /// Tries to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.inner.try_read_buf(buf) + } + } + + /// Returns the remote address that this stream is connected to. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.inner.peer_addr() + } + + /// Returns the local address that this stream is bound to. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.local_addr() + } } impl AsyncRead for OwnedReadHalf { @@ -211,13 +337,94 @@ impl OwnedWriteHalf { reunite(other, self) } - /// Destroy the write half, but don't close the write half of the stream + /// Destroys the write half, but don't close the write half of the stream /// until the read half is dropped. If the read half has already been /// dropped, this closes the stream. pub fn forget(mut self) { self.shutdown_on_drop = false; drop(self); } + + /// Waits for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// This function is equivalent to [`TcpStream::ready`]. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.inner.ready(interest).await + } + + /// Waits for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with `try_write()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn writable(&self) -> io::Result<()> { + self.inner.writable().await + } + + /// Tries to write a buffer to the stream, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is usually paired with `writable()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { + self.inner.try_write(buf) + } + + /// Tries to write several buffers to the stream, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: Self::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> { + self.inner.try_write_vectored(bufs) + } + + /// Returns the remote address that this stream is connected to. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.inner.peer_addr() + } + + /// Returns the local address that this stream is bound to. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.local_addr() + } } impl Drop for OwnedWriteHalf { diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 34ac6ee..60d20fd 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -192,7 +192,7 @@ impl TcpStream { Ok(TcpStream { io }) } - /// Turn a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`]. + /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`]. /// /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`. /// Use [`set_nonblocking`] to change the blocking mode if needed. @@ -350,7 +350,7 @@ impl TcpStream { } } - /// Wait for any of the requested ready states. + /// Waits for any of the requested ready states. /// /// This function is usually paired with `try_read()` or `try_write()`. It /// can be used to concurrently read / write to the same socket on a single @@ -422,7 +422,7 @@ impl TcpStream { Ok(event.ready) } - /// Wait for the socket to become readable. + /// Waits for the socket to become readable. /// /// This function is equivalent to `ready(Interest::READABLE)` and is usually /// paired with `try_read()`. @@ -510,7 +510,7 @@ impl TcpStream { self.io.registration().poll_read_ready(cx).map_ok(|_| ()) } - /// Try to read data from the stream into the provided buffer, returning how + /// Tries to read data from the stream into the provided buffer, returning how /// many bytes were read. /// /// Receives any pending data from the socket but does not wait for new data @@ -577,7 +577,7 @@ impl TcpStream { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } - /// Try to read data from the stream into the provided buffers, returning + /// Tries to read data from the stream into the provided buffers, returning /// how many bytes were read. /// /// Data is copied to fill each buffer in order, with the final buffer @@ -656,7 +656,7 @@ impl TcpStream { } cfg_io_util! { - /// Try to read data from the stream into the provided buffer, advancing the + /// Tries to read data from the stream into the provided buffer, advancing the /// buffer's internal cursor, returning how many bytes were read. /// /// Receives any pending data from the socket but does not wait for new data @@ -734,7 +734,7 @@ impl TcpStream { } } - /// Wait for the socket to become writable. + /// Waits for the socket to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually /// paired with `try_write()`. @@ -874,7 +874,7 @@ impl TcpStream { .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) } - /// Try to write several buffers to the stream, returning how many bytes + /// Tries to write several buffers to the stream, returning how many bytes /// were written. /// /// Data is written from each buffer in order, with the final buffer read @@ -936,7 +936,7 @@ impl TcpStream { .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs)) } - /// Try to read or write from the socket using a user-provided IO operation. + /// Tries to read or write from the socket using a user-provided IO operation. /// /// If the socket is ready, the provided closure is called. The closure /// should attempt to perform IO operation from the socket by manually diff --git a/src/net/udp.rs b/src/net/udp.rs index 75cc6f3..504d74e 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -12,7 +12,7 @@ cfg_io_util! { } cfg_net! { - /// A UDP socket + /// A UDP socket. /// /// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket` /// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`: @@ -211,7 +211,7 @@ impl UdpSocket { UdpSocket::new(io) } - /// Turn a [`tokio::net::UdpSocket`] into a [`std::net::UdpSocket`]. + /// Turns a [`tokio::net::UdpSocket`] into a [`std::net::UdpSocket`]. /// /// The returned [`std::net::UdpSocket`] will have nonblocking mode set as /// `true`. Use [`set_nonblocking`] to change the blocking mode if needed. @@ -317,7 +317,7 @@ impl UdpSocket { })) } - /// Wait for any of the requested ready states. + /// Waits for any of the requested ready states. /// /// This function is usually paired with `try_recv()` or `try_send()`. It /// can be used to concurrently recv / send to the same socket on a single @@ -388,7 +388,7 @@ impl UdpSocket { Ok(event.ready) } - /// Wait for the socket to become writable. + /// Waits for the socket to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is /// usually paired with `try_send()` or `try_send_to()`. @@ -443,6 +443,39 @@ impl UdpSocket { Ok(()) } + /// Polls for write/send readiness. + /// + /// If the udp stream is not currently ready for sending, this method will + /// store a clone of the `Waker` from the provided `Context`. When the udp + /// stream becomes ready for sending, `Waker::wake` will be called on the + /// waker. + /// + /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a + /// second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`writable`] is not feasible. Where possible, using [`writable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the udp stream is not ready for writing. + /// * `Poll::Ready(Ok(()))` if the udp stream is ready for writing. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`writable`]: method@Self::writable + pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.io.registration().poll_write_ready(cx).map_ok(|_| ()) + } + /// Sends data on the socket to the remote address that the socket is /// connected to. /// @@ -516,7 +549,7 @@ impl UdpSocket { .poll_write_io(cx, || self.io.send(buf)) } - /// Try to send data on the socket to the remote address to which it is + /// Tries to send data on the socket to the remote address to which it is /// connected. /// /// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is @@ -570,7 +603,7 @@ impl UdpSocket { .try_io(Interest::WRITABLE, || self.io.send(buf)) } - /// Wait for the socket to become readable. + /// Waits for the socket to become readable. /// /// This function is equivalent to `ready(Interest::READABLE)` and is usually /// paired with `try_recv()`. @@ -630,6 +663,39 @@ impl UdpSocket { Ok(()) } + /// Polls for read/receive readiness. + /// + /// If the udp stream is not currently ready for receiving, this method will + /// store a clone of the `Waker` from the provided `Context`. When the udp + /// socket becomes ready for reading, `Waker::wake` will be called on the + /// waker. + /// + /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or + /// `poll_peek`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. (However, + /// `poll_send_ready` retains a second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`readable`] is not feasible. Where possible, using [`readable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the udp stream is not ready for reading. + /// * `Poll::Ready(Ok(()))` if the udp stream is ready for reading. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`readable`]: method@Self::readable + pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.io.registration().poll_read_ready(cx).map_ok(|_| ()) + } + /// Receives a single datagram message on the socket from the remote address /// to which it is connected. On success, returns the number of bytes read. /// @@ -715,7 +781,7 @@ impl UdpSocket { Poll::Ready(Ok(())) } - /// Try to receive a single datagram message on the socket from the remote + /// Tries to receive a single datagram message on the socket from the remote /// address to which it is connected. On success, returns the number of /// bytes read. /// @@ -772,7 +838,7 @@ impl UdpSocket { } cfg_io_util! { - /// Try to receive data from the stream into the provided buffer, advancing the + /// Tries to receive data from the stream into the provided buffer, advancing the /// buffer's internal cursor, returning how many bytes were read. /// /// The function must be called with valid byte array buf of sufficient size @@ -837,7 +903,7 @@ impl UdpSocket { }) } - /// Try to receive a single datagram message on the socket. On success, + /// Tries to receive a single datagram message on the socket. On success, /// returns the number of bytes read and the origin. /// /// The function must be called with valid byte array buf of sufficient size @@ -978,7 +1044,7 @@ impl UdpSocket { .poll_write_io(cx, || self.io.send_to(buf, target)) } - /// Try to send data on the socket to the given address, but if the send is + /// Tries to send data on the socket to the given address, but if the send is /// blocked this will return right away. /// /// This function is usually paired with `writable()`. @@ -1116,7 +1182,7 @@ impl UdpSocket { Poll::Ready(Ok(addr)) } - /// Try to receive a single datagram message on the socket. On success, + /// Tries to receive a single datagram message on the socket. On success, /// returns the number of bytes read and the origin. /// /// The function must be called with valid byte array buf of sufficient size @@ -1170,7 +1236,7 @@ impl UdpSocket { .try_io(Interest::READABLE, || self.io.recv_from(buf)) } - /// Try to read or write from the socket using a user-provided IO operation. + /// Tries to read or write from the socket using a user-provided IO operation. /// /// If the socket is ready, the provided closure is called. The closure /// should attempt to perform IO operation from the socket by manually diff --git a/src/net/unix/datagram/socket.rs b/src/net/unix/datagram/socket.rs index 7874b8a..d5b6186 100644 --- a/src/net/unix/datagram/socket.rs +++ b/src/net/unix/datagram/socket.rs @@ -96,7 +96,7 @@ cfg_net_unix! { } impl UnixDatagram { - /// Wait for any of the requested ready states. + /// Waits for any of the requested ready states. /// /// This function is usually paired with `try_recv()` or `try_send()`. It /// can be used to concurrently recv / send to the same socket on a single @@ -169,7 +169,7 @@ impl UnixDatagram { Ok(event.ready) } - /// Wait for the socket to become writable. + /// Waits for the socket to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is /// usually paired with `try_send()` or `try_send_to()`. @@ -226,7 +226,40 @@ impl UnixDatagram { Ok(()) } - /// Wait for the socket to become readable. + /// Polls for write/send readiness. + /// + /// If the socket is not currently ready for sending, this method will + /// store a clone of the `Waker` from the provided `Context`. When the socket + /// becomes ready for sending, `Waker::wake` will be called on the + /// waker. + /// + /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a + /// second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`writable`] is not feasible. Where possible, using [`writable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the socket is not ready for writing. + /// * `Poll::Ready(Ok(()))` if the socket is ready for writing. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`writable`]: method@Self::writable + pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.io.registration().poll_write_ready(cx).map_ok(|_| ()) + } + + /// Waits for the socket to become readable. /// /// This function is equivalent to `ready(Interest::READABLE)` and is usually /// paired with `try_recv()`. @@ -289,6 +322,39 @@ impl UnixDatagram { Ok(()) } + /// Polls for read/receive readiness. + /// + /// If the socket is not currently ready for receiving, this method will + /// store a clone of the `Waker` from the provided `Context`. When the + /// socket becomes ready for reading, `Waker::wake` will be called on the + /// waker. + /// + /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or + /// `poll_peek`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. (However, + /// `poll_send_ready` retains a second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`readable`] is not feasible. Where possible, using [`readable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the socket is not ready for reading. + /// * `Poll::Ready(Ok(()))` if the socket is ready for reading. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`readable`]: method@Self::readable + pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.io.registration().poll_read_ready(cx).map_ok(|_| ()) + } + /// Creates a new `UnixDatagram` bound to the specified path. /// /// # Examples @@ -397,7 +463,7 @@ impl UnixDatagram { Ok(UnixDatagram { io }) } - /// Turn a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`]. + /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`]. /// /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode @@ -548,7 +614,7 @@ impl UnixDatagram { .await } - /// Try to send a datagram to the peer without waiting. + /// Tries to send a datagram to the peer without waiting. /// /// # Examples /// @@ -592,7 +658,7 @@ impl UnixDatagram { .try_io(Interest::WRITABLE, || self.io.send(buf)) } - /// Try to send a datagram to the peer without waiting. + /// Tries to send a datagram to the peer without waiting. /// /// # Examples /// @@ -678,7 +744,7 @@ impl UnixDatagram { .await } - /// Try to receive a datagram from the peer without waiting. + /// Tries to receive a datagram from the peer without waiting. /// /// # Examples /// @@ -729,7 +795,7 @@ impl UnixDatagram { } cfg_io_util! { - /// Try to receive data from the socket without waiting. + /// Tries to receive data from the socket without waiting. /// /// # Examples /// @@ -790,7 +856,7 @@ impl UnixDatagram { Ok((n, SocketAddr(addr))) } - /// Try to read data from the stream into the provided buffer, advancing the + /// Tries to read data from the stream into the provided buffer, advancing the /// buffer's internal cursor, returning how many bytes were read. /// /// # Examples @@ -1091,7 +1157,7 @@ impl UnixDatagram { Poll::Ready(Ok(())) } - /// Try to receive data from the socket without waiting. + /// Tries to receive data from the socket without waiting. /// /// # Examples /// @@ -1143,7 +1209,7 @@ impl UnixDatagram { Ok((n, SocketAddr(addr))) } - /// Try to read or write from the socket using a user-provided IO operation. + /// Tries to read or write from the socket using a user-provided IO operation. /// /// If the socket is ready, the provided closure is called. The closure /// should attempt to perform IO operation from the socket by manually diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs index efb9503..1785f8b 100644 --- a/src/net/unix/listener.rs +++ b/src/net/unix/listener.rs @@ -88,7 +88,7 @@ impl UnixListener { Ok(UnixListener { io }) } - /// Turn a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`]. + /// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`]. /// /// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed. diff --git a/src/net/unix/mod.rs b/src/net/unix/mod.rs index c3046f1..14cb456 100644 --- a/src/net/unix/mod.rs +++ b/src/net/unix/mod.rs @@ -1,4 +1,4 @@ -//! Unix domain socket utility types +//! Unix domain socket utility types. // This module does not currently provide any public API, but it was // unintentionally defined as a public module. Hide it from the documentation diff --git a/src/net/unix/split.rs b/src/net/unix/split.rs index 97214f7..d4686c2 100644 --- a/src/net/unix/split.rs +++ b/src/net/unix/split.rs @@ -8,14 +8,19 @@ //! split has no associated overhead and enforces all invariants at the type //! level. -use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready}; use crate::net::UnixStream; +use crate::net::unix::SocketAddr; use std::io; use std::net::Shutdown; use std::pin::Pin; use std::task::{Context, Poll}; +cfg_io_util! { + use bytes::BufMut; +} + /// Borrowed read half of a [`UnixStream`], created by [`split`]. /// /// Reading from a `ReadHalf` is usually done using the convenience methods found on the @@ -47,6 +52,206 @@ pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) { (ReadHalf(stream), WriteHalf(stream)) } +impl ReadHalf<'_> { + /// Wait for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.0.ready(interest).await + } + + /// Waits for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_read()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn readable(&self) -> io::Result<()> { + self.0.readable().await + } + + /// Tries to read data from the stream into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { + self.0.try_read(buf) + } + + cfg_io_util! { + /// Tries to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.0.try_read_buf(buf) + } + } + + /// Tries to read data from the stream into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: Self::try_read() + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { + self.0.try_read_vectored(bufs) + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.0.peer_addr() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.0.local_addr() + } +} + +impl WriteHalf<'_> { + /// Waits for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.0.ready(interest).await + } + + /// Waits for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with `try_write()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn writable(&self) -> io::Result<()> { + self.0.writable().await + } + + /// Tries to write a buffer to the stream, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is usually paired with `writable()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { + self.0.try_write(buf) + } + + /// Tries to write several buffers to the stream, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: Self::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { + self.0.try_write_vectored(buf) + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.0.peer_addr() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.0.local_addr() + } +} + impl AsyncRead for ReadHalf<'_> { fn poll_read( self: Pin<&mut Self>, diff --git a/src/net/unix/split_owned.rs b/src/net/unix/split_owned.rs index 3d6ac6a..9c3a2a4 100644 --- a/src/net/unix/split_owned.rs +++ b/src/net/unix/split_owned.rs @@ -8,9 +8,10 @@ //! split has no associated overhead and enforces all invariants at the type //! level. -use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready}; use crate::net::UnixStream; +use crate::net::unix::SocketAddr; use std::error::Error; use std::net::Shutdown; use std::pin::Pin; @@ -18,6 +19,10 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::{fmt, io}; +cfg_io_util! { + use bytes::BufMut; +} + /// Owned read half of a [`UnixStream`], created by [`into_split`]. /// /// Reading from an `OwnedReadHalf` is usually done using the convenience methods found @@ -102,6 +107,124 @@ impl OwnedReadHalf { pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> { reunite(self, other) } + + /// Waits for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.inner.ready(interest).await + } + + /// Waits for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_read()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn readable(&self) -> io::Result<()> { + self.inner.readable().await + } + + /// Tries to read data from the stream into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.try_read(buf) + } + + cfg_io_util! { + /// Tries to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.inner.try_read_buf(buf) + } + } + + /// Tries to read data from the stream into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: Self::try_read() + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { + self.inner.try_read_vectored(bufs) + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.inner.peer_addr() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.local_addr() + } } impl AsyncRead for OwnedReadHalf { @@ -124,13 +247,92 @@ impl OwnedWriteHalf { reunite(other, self) } - /// Destroy the write half, but don't close the write half of the stream + /// Destroys the write half, but don't close the write half of the stream /// until the read half is dropped. If the read half has already been /// dropped, this closes the stream. pub fn forget(mut self) { self.shutdown_on_drop = false; drop(self); } + + /// Waits for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.inner.ready(interest).await + } + + /// Waits for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with `try_write()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn writable(&self) -> io::Result<()> { + self.inner.writable().await + } + + /// Tries to write a buffer to the stream, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is usually paired with `writable()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { + self.inner.try_write(buf) + } + + /// Tries to write several buffers to the stream, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: Self::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { + self.inner.try_write_vectored(buf) + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.inner.peer_addr() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.local_addr() + } } impl Drop for OwnedWriteHalf { diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index 5837f36..4e7ef87 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -59,7 +59,7 @@ impl UnixStream { Ok(stream) } - /// Wait for any of the requested ready states. + /// Waits for any of the requested ready states. /// /// This function is usually paired with `try_read()` or `try_write()`. It /// can be used to concurrently read / write to the same socket on a single @@ -133,7 +133,7 @@ impl UnixStream { Ok(event.ready) } - /// Wait for the socket to become readable. + /// Waits for the socket to become readable. /// /// This function is equivalent to `ready(Interest::READABLE)` and is usually /// paired with `try_read()`. @@ -290,7 +290,7 @@ impl UnixStream { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } - /// Try to read data from the stream into the provided buffers, returning + /// Tries to read data from the stream into the provided buffers, returning /// how many bytes were read. /// /// Data is copied to fill each buffer in order, with the final buffer @@ -369,7 +369,7 @@ impl UnixStream { } cfg_io_util! { - /// Try to read data from the stream into the provided buffer, advancing the + /// Tries to read data from the stream into the provided buffer, advancing the /// buffer's internal cursor, returning how many bytes were read. /// /// Receives any pending data from the socket but does not wait for new data @@ -449,7 +449,7 @@ impl UnixStream { } } - /// Wait for the socket to become writable. + /// Waits for the socket to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually /// paired with `try_write()`. @@ -535,7 +535,7 @@ impl UnixStream { self.io.registration().poll_write_ready(cx).map_ok(|_| ()) } - /// Try to write a buffer to the stream, returning how many bytes were + /// Tries to write a buffer to the stream, returning how many bytes were /// written. /// /// The function will attempt to write the entire contents of `buf`, but @@ -591,7 +591,7 @@ impl UnixStream { .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) } - /// Try to write several buffers to the stream, returning how many bytes + /// Tries to write several buffers to the stream, returning how many bytes /// were written. /// /// Data is written from each buffer in order, with the final buffer read @@ -653,7 +653,7 @@ impl UnixStream { .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) } - /// Try to read or write from the socket using a user-provided IO operation. + /// Tries to read or write from the socket using a user-provided IO operation. /// /// If the socket is ready, the provided closure is called. The closure /// should attempt to perform IO operation from the socket by manually @@ -709,7 +709,7 @@ impl UnixStream { Ok(UnixStream { io }) } - /// Turn a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`]. + /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`]. /// /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking /// mode set as `true`. Use [`set_nonblocking`] to change the blocking @@ -773,11 +773,41 @@ impl UnixStream { } /// Returns the socket address of the local half of this connection. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let dir = tempfile::tempdir().unwrap(); + /// let bind_path = dir.path().join("bind_path"); + /// let stream = UnixStream::connect(bind_path).await?; + /// + /// println!("{:?}", stream.local_addr()?); + /// # Ok(()) + /// # } + /// ``` pub fn local_addr(&self) -> io::Result<SocketAddr> { self.io.local_addr().map(SocketAddr) } /// Returns the socket address of the remote half of this connection. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let dir = tempfile::tempdir().unwrap(); + /// let bind_path = dir.path().join("bind_path"); + /// let stream = UnixStream::connect(bind_path).await?; + /// + /// println!("{:?}", stream.peer_addr()?); + /// # Ok(()) + /// # } + /// ``` pub fn peer_addr(&self) -> io::Result<SocketAddr> { self.io.peer_addr().map(SocketAddr) } @@ -804,7 +834,7 @@ impl UnixStream { // These lifetime markers also appear in the generated documentation, and make // it more clear that this is a *borrowed* split. #[allow(clippy::needless_lifetimes)] - /// Split a `UnixStream` into a read half and a write half, which can be used + /// Splits a `UnixStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. /// /// This method is more efficient than [`into_split`], but the halves cannot be diff --git a/src/net/unix/ucred.rs b/src/net/unix/ucred.rs index 49c7142..865303b 100644 --- a/src/net/unix/ucred.rs +++ b/src/net/unix/ucred.rs @@ -1,13 +1,13 @@ use libc::{gid_t, pid_t, uid_t}; -/// Credentials of a process +/// Credentials of a process. #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] pub struct UCred { - /// PID (process ID) of the process + /// PID (process ID) of the process. pid: Option<pid_t>, - /// UID (user ID) of the process + /// UID (user ID) of the process. uid: uid_t, - /// GID (group ID) of the process + /// GID (group ID) of the process. gid: gid_t, } diff --git a/src/net/windows/named_pipe.rs b/src/net/windows/named_pipe.rs index de6ab58..550fd4d 100644 --- a/src/net/windows/named_pipe.rs +++ b/src/net/windows/named_pipe.rs @@ -105,7 +105,7 @@ pub struct NamedPipeServer { } impl NamedPipeServer { - /// Construct a new named pipe server from the specified raw handle. + /// Constructs a new named pipe server from the specified raw handle. /// /// This function will consume ownership of the handle given, passing /// responsibility for closing the handle to the returned object. @@ -234,7 +234,7 @@ impl NamedPipeServer { self.io.disconnect() } - /// Wait for any of the requested ready states. + /// Waits for any of the requested ready states. /// /// This function is usually paired with `try_read()` or `try_write()`. It /// can be used to concurrently read / write to the same pipe on a single @@ -301,7 +301,7 @@ impl NamedPipeServer { Ok(event.ready) } - /// Wait for the pipe to become readable. + /// Waits for the pipe to become readable. /// /// This function is equivalent to `ready(Interest::READABLE)` and is usually /// paired with `try_read()`. @@ -383,7 +383,7 @@ impl NamedPipeServer { self.io.registration().poll_read_ready(cx).map_ok(|_| ()) } - /// Try to read data from the pipe into the provided buffer, returning how + /// Tries to read data from the pipe into the provided buffer, returning how /// many bytes were read. /// /// Receives any pending data from the pipe but does not wait for new data @@ -450,7 +450,7 @@ impl NamedPipeServer { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } - /// Try to read data from the pipe into the provided buffers, returning + /// Tries to read data from the pipe into the provided buffers, returning /// how many bytes were read. /// /// Data is copied to fill each buffer in order, with the final buffer @@ -528,7 +528,7 @@ impl NamedPipeServer { .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) } - /// Wait for the pipe to become writable. + /// Waits for the pipe to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually /// paired with `try_write()`. @@ -606,7 +606,7 @@ impl NamedPipeServer { self.io.registration().poll_write_ready(cx).map_ok(|_| ()) } - /// Try to write a buffer to the pipe, returning how many bytes were + /// Tries to write a buffer to the pipe, returning how many bytes were /// written. /// /// The function will attempt to write the entire contents of `buf`, but @@ -662,7 +662,7 @@ impl NamedPipeServer { .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) } - /// Try to write several buffers to the pipe, returning how many bytes + /// Tries to write several buffers to the pipe, returning how many bytes /// were written. /// /// Data is written from each buffer in order, with the final buffer read @@ -724,7 +724,7 @@ impl NamedPipeServer { .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) } - /// Try to read or write from the socket using a user-provided IO operation. + /// Tries to read or write from the socket using a user-provided IO operation. /// /// If the socket is ready, the provided closure is called. The closure /// should attempt to perform IO operation from the socket by manually @@ -846,7 +846,7 @@ pub struct NamedPipeClient { } impl NamedPipeClient { - /// Construct a new named pipe client from the specified raw handle. + /// Constructs a new named pipe client from the specified raw handle. /// /// This function will consume ownership of the handle given, passing /// responsibility for closing the handle to the returned object. @@ -896,7 +896,7 @@ impl NamedPipeClient { unsafe { named_pipe_info(self.io.as_raw_handle()) } } - /// Wait for any of the requested ready states. + /// Waits for any of the requested ready states. /// /// This function is usually paired with `try_read()` or `try_write()`. It /// can be used to concurrently read / write to the same pipe on a single @@ -962,7 +962,7 @@ impl NamedPipeClient { Ok(event.ready) } - /// Wait for the pipe to become readable. + /// Waits for the pipe to become readable. /// /// This function is equivalent to `ready(Interest::READABLE)` and is usually /// paired with `try_read()`. @@ -1043,7 +1043,7 @@ impl NamedPipeClient { self.io.registration().poll_read_ready(cx).map_ok(|_| ()) } - /// Try to read data from the pipe into the provided buffer, returning how + /// Tries to read data from the pipe into the provided buffer, returning how /// many bytes were read. /// /// Receives any pending data from the pipe but does not wait for new data @@ -1109,7 +1109,7 @@ impl NamedPipeClient { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } - /// Try to read data from the pipe into the provided buffers, returning + /// Tries to read data from the pipe into the provided buffers, returning /// how many bytes were read. /// /// Data is copied to fill each buffer in order, with the final buffer @@ -1186,7 +1186,7 @@ impl NamedPipeClient { .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) } - /// Wait for the pipe to become writable. + /// Waits for the pipe to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually /// paired with `try_write()`. @@ -1263,7 +1263,7 @@ impl NamedPipeClient { self.io.registration().poll_write_ready(cx).map_ok(|_| ()) } - /// Try to write a buffer to the pipe, returning how many bytes were + /// Tries to write a buffer to the pipe, returning how many bytes were /// written. /// /// The function will attempt to write the entire contents of `buf`, but @@ -1318,7 +1318,7 @@ impl NamedPipeClient { .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) } - /// Try to write several buffers to the pipe, returning how many bytes + /// Tries to write several buffers to the pipe, returning how many bytes /// were written. /// /// Data is written from each buffer in order, with the final buffer read @@ -1379,7 +1379,7 @@ impl NamedPipeClient { .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) } - /// Try to read or write from the socket using a user-provided IO operation. + /// Tries to read or write from the socket using a user-provided IO operation. /// /// If the socket is ready, the provided closure is called. The closure /// should attempt to perform IO operation from the socket by manually @@ -1882,7 +1882,7 @@ impl ServerOptions { self } - /// Create the named pipe identified by `addr` for use as a server. + /// Creates the named pipe identified by `addr` for use as a server. /// /// This uses the [`CreateNamedPipe`] function. /// @@ -1913,7 +1913,7 @@ impl ServerOptions { unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) } } - /// Create the named pipe identified by `addr` for use as a server. + /// Creates the named pipe identified by `addr` for use as a server. /// /// This is the same as [`create`] except that it supports providing the raw /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed @@ -2042,7 +2042,7 @@ impl ClientOptions { self } - /// Open the named pipe identified by `addr`. + /// Opens the named pipe identified by `addr`. /// /// This opens the client using [`CreateFile`] with the /// `dwCreationDisposition` option set to `OPEN_EXISTING`. @@ -2099,7 +2099,7 @@ impl ClientOptions { unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) } } - /// Open the named pipe identified by `addr`. + /// Opens the named pipe identified by `addr`. /// /// This is the same as [`open`] except that it supports providing the raw /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed @@ -2201,7 +2201,7 @@ pub struct PipeInfo { pub in_buffer_size: u32, } -/// Encode an address so that it is a null-terminated wide string. +/// Encodes an address so that it is a null-terminated wide string. fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> { let len = addr.as_ref().encode_wide().count(); let mut vec = Vec::with_capacity(len + 1); diff --git a/src/park/mod.rs b/src/park/mod.rs index edd9371..87d04ff 100644 --- a/src/park/mod.rs +++ b/src/park/mod.rs @@ -45,12 +45,12 @@ use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; -/// Block the current thread. +/// Blocks the current thread. pub(crate) trait Park { /// Unpark handle type for the `Park` implementation. type Unpark: Unpark; - /// Error returned by `park` + /// Error returned by `park`. type Error: Debug; /// Gets a new `Unpark` handle associated with this `Park` instance. @@ -66,7 +66,7 @@ pub(crate) trait Park { /// /// This function **should** not panic, but ultimately, panics are left as /// an implementation detail. Refer to the documentation for the specific - /// `Park` implementation + /// `Park` implementation. fn park(&mut self) -> Result<(), Self::Error>; /// Parks the current thread for at most `duration`. @@ -82,10 +82,10 @@ pub(crate) trait Park { /// /// This function **should** not panic, but ultimately, panics are left as /// an implementation detail. Refer to the documentation for the specific - /// `Park` implementation + /// `Park` implementation. fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>; - /// Release all resources holded by the parker for proper leak-free shutdown + /// Releases all resources holded by the parker for proper leak-free shutdown. fn shutdown(&mut self); } @@ -100,7 +100,7 @@ pub(crate) trait Unpark: Sync + Send + 'static { /// /// This function **should** not panic, but ultimately, panics are left as /// an implementation detail. Refer to the documentation for the specific - /// `Unpark` implementation + /// `Unpark` implementation. fn unpark(&self); } diff --git a/src/park/thread.rs b/src/park/thread.rs index 2725e45..27ce202 100644 --- a/src/park/thread.rs +++ b/src/park/thread.rs @@ -76,7 +76,7 @@ impl Park for ParkThread { // ==== impl Inner ==== impl Inner { - /// Park the current thread for at most `dur`. + /// Parks the current thread for at most `dur`. fn park(&self) { // If we were previously notified then we consume this notification and // return quickly. @@ -227,7 +227,7 @@ pub(crate) struct CachedParkThread { } impl CachedParkThread { - /// Create a new `ParkThread` handle for the current thread. + /// Creates a new `ParkThread` handle for the current thread. /// /// This type cannot be moved to other threads, so it should be created on /// the thread that the caller intends to park. @@ -241,7 +241,7 @@ impl CachedParkThread { self.with_current(|park_thread| park_thread.unpark()) } - /// Get a reference to the `ParkThread` handle for this thread. + /// Gets a reference to the `ParkThread` handle for this thread. fn with_current<F, R>(&self, f: F) -> Result<R, ParkError> where F: FnOnce(&ParkThread) -> R, diff --git a/src/process/mod.rs b/src/process/mod.rs index 7a15024..6eeefdb 100644 --- a/src/process/mod.rs +++ b/src/process/mod.rs @@ -578,7 +578,7 @@ impl Command { self } - /// Set executable argument + /// Sets executable argument. /// /// Set the first process argument, `argv[0]`, to something other than the /// default executable path. @@ -1173,7 +1173,7 @@ pub struct ChildStderr { } impl ChildStdin { - /// Create an asynchronous `ChildStdin` from a synchronous one. + /// Creates an asynchronous `ChildStdin` from a synchronous one. /// /// # Errors /// @@ -1188,7 +1188,7 @@ impl ChildStdin { } impl ChildStdout { - /// Create an asynchronous `ChildStderr` from a synchronous one. + /// Creates an asynchronous `ChildStderr` from a synchronous one. /// /// # Errors /// @@ -1203,7 +1203,7 @@ impl ChildStdout { } impl ChildStderr { - /// Create an asynchronous `ChildStderr` from a synchronous one. + /// Creates an asynchronous `ChildStderr` from a synchronous one. /// /// # Errors /// diff --git a/src/process/unix/driver.rs b/src/process/unix/driver.rs index 43b2efa..84dc8fb 100644 --- a/src/process/unix/driver.rs +++ b/src/process/unix/driver.rs @@ -1,6 +1,6 @@ #![cfg_attr(not(feature = "rt"), allow(dead_code))] -//! Process driver +//! Process driver. use crate::park::Park; use crate::process::unix::GlobalOrphanQueue; diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs index 0f379c9..576fe6c 100644 --- a/src/process/unix/mod.rs +++ b/src/process/unix/mod.rs @@ -1,4 +1,4 @@ -//! Unix handling of child processes +//! Unix handling of child processes. //! //! Right now the only "fancy" thing about this is how we implement the //! `Future` implementation on `Child` to get the exit status. Unix offers diff --git a/src/runtime/basic_scheduler.rs b/src/runtime/basic_scheduler.rs index e37d872..872d0d5 100644 --- a/src/runtime/basic_scheduler.rs +++ b/src/runtime/basic_scheduler.rs @@ -2,6 +2,7 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; +use crate::runtime::context::EnterGuard; use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher}; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; use crate::runtime::Callback; @@ -12,7 +13,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::sync::atomic::Ordering::{AcqRel, Release}; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; use std::time::Duration; @@ -29,6 +30,12 @@ pub(crate) struct BasicScheduler<P: Park> { /// Sendable task spawner spawner: Spawner, + + /// This is usually None, but right before dropping the BasicScheduler, it + /// is changed to `Some` with the context being the runtime's own context. + /// This ensures that any tasks dropped in the `BasicScheduler`s destructor + /// run in that runtime's context. + context_guard: Option<EnterGuard>, } /// The inner scheduler that owns the task queue and the main parker P. @@ -160,6 +167,7 @@ impl<P: Park> BasicScheduler<P> { inner, notify: Notify::new(), spawner, + context_guard: None, } } @@ -210,22 +218,24 @@ impl<P: Park> BasicScheduler<P> { basic_scheduler: self, }) } + + pub(super) fn set_context_guard(&mut self, guard: EnterGuard) { + self.context_guard = Some(guard); + } } impl<P: Park> Inner<P> { - /// Block on the future provided and drive the runtime's driver. + /// Blocks on the provided future and drives the runtime's driver. fn block_on<F: Future>(&mut self, future: F) -> F::Output { enter(self, |scheduler, context| { let _enter = crate::runtime::enter(false); let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); - let mut polled = false; pin!(future); 'outer: loop { - if scheduler.spawner.was_woken() || !polled { - polled = true; + if scheduler.spawner.reset_woken() { scheduler.stats.incr_poll_count(); if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { return v; @@ -301,8 +311,8 @@ impl<P: Park> Inner<P> { } } -/// Enter the scheduler context. This sets the queue and other necessary -/// scheduler state in the thread-local +/// Enters the scheduler context. This sets the queue and other necessary +/// scheduler state in the thread-local. fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R where F: FnOnce(&mut Inner<P>, &Context) -> R, @@ -418,13 +428,15 @@ impl Spawner { } fn waker_ref(&self) -> WakerRef<'_> { - // clear the woken bit - self.shared.woken.swap(false, AcqRel); + // Set woken to true when enter block_on, ensure outer future + // be polled for the first time when enter loop + self.shared.woken.store(true, Release); waker_ref(&self.shared) } - fn was_woken(&self) -> bool { - self.shared.woken.load(Acquire) + // reset woken to false and return original value + pub(crate) fn reset_woken(&self) -> bool { + self.shared.woken.swap(false, AcqRel) } } diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs index 0c23bb0..77ab495 100644 --- a/src/runtime/blocking/pool.rs +++ b/src/runtime/blocking/pool.rs @@ -8,7 +8,6 @@ use crate::runtime::builder::ThreadNameFn; use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; -use crate::util::error::CONTEXT_MISSING_ERROR; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -25,28 +24,28 @@ pub(crate) struct Spawner { } struct Inner { - /// State shared between worker threads + /// State shared between worker threads. shared: Mutex<Shared>, /// Pool threads wait on this. condvar: Condvar, - /// Spawned threads use this name + /// Spawned threads use this name. thread_name: ThreadNameFn, - /// Spawned thread stack size + /// Spawned thread stack size. stack_size: Option<usize>, - /// Call after a thread starts + /// Call after a thread starts. after_start: Option<Callback>, - /// Call before a thread stops + /// Call before a thread stops. before_stop: Option<Callback>, - // Maximum number of threads + // Maximum number of threads. thread_cap: usize, - // Customizable wait timeout + // Customizable wait timeout. keep_alive: Duration, } @@ -67,7 +66,7 @@ struct Shared { /// calling shutdown handles joining on these. worker_threads: HashMap<usize, thread::JoinHandle<()>>, /// This is a counter used to iterate worker_threads in a consistent order (for loom's - /// benefit) + /// benefit). worker_thread_index: usize, } @@ -75,13 +74,13 @@ type Task = task::UnownedTask<NoopSchedule>; const KEEP_ALIVE: Duration = Duration::from_secs(10); -/// Run the provided function on an executor dedicated to blocking operations. +/// Runs the provided function on an executor dedicated to blocking operations. pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let rt = context::current().expect(CONTEXT_MISSING_ERROR); + let rt = context::current(); rt.spawn_blocking(func) } diff --git a/src/runtime/blocking/shutdown.rs b/src/runtime/blocking/shutdown.rs index 0cf2285..e6f4674 100644 --- a/src/runtime/blocking/shutdown.rs +++ b/src/runtime/blocking/shutdown.rs @@ -10,7 +10,7 @@ use std::time::Duration; #[derive(Debug, Clone)] pub(super) struct Sender { - tx: Arc<oneshot::Sender<()>>, + _tx: Arc<oneshot::Sender<()>>, } #[derive(Debug)] @@ -20,7 +20,7 @@ pub(super) struct Receiver { pub(super) fn channel() -> (Sender, Receiver) { let (tx, rx) = oneshot::channel(); - let tx = Sender { tx: Arc::new(tx) }; + let tx = Sender { _tx: Arc::new(tx) }; let rx = Receiver { rx }; (tx, rx) diff --git a/src/runtime/blocking/task.rs b/src/runtime/blocking/task.rs index ee2d8d6..0b7803a 100644 --- a/src/runtime/blocking/task.rs +++ b/src/runtime/blocking/task.rs @@ -2,13 +2,13 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -/// Converts a function to a future that completes on poll +/// Converts a function to a future that completes on poll. pub(crate) struct BlockingTask<T> { func: Option<T>, } impl<T> BlockingTask<T> { - /// Initializes a new blocking task from the given function + /// Initializes a new blocking task from the given function. pub(crate) fn new(func: T) -> BlockingTask<T> { BlockingTask { func: Some(func) } } diff --git a/src/runtime/context.rs b/src/runtime/context.rs index a727ed4..1f44a53 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -1,5 +1,5 @@ //! Thread local runtime context -use crate::runtime::Handle; +use crate::runtime::{Handle, TryCurrentError}; use std::cell::RefCell; @@ -7,58 +7,96 @@ thread_local! { static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None) } -pub(crate) fn current() -> Option<Handle> { - CONTEXT.with(|ctx| ctx.borrow().clone()) +pub(crate) fn try_current() -> Result<Handle, crate::runtime::TryCurrentError> { + match CONTEXT.try_with(|ctx| ctx.borrow().clone()) { + Ok(Some(handle)) => Ok(handle), + Ok(None) => Err(TryCurrentError::new_no_context()), + Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()), + } +} + +pub(crate) fn current() -> Handle { + match try_current() { + Ok(handle) => handle, + Err(e) => panic!("{}", e), + } } cfg_io_driver! { pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle { - CONTEXT.with(|ctx| { + match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).io_handle.clone() - }) + }) { + Ok(io_handle) => io_handle, + Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), + } } } cfg_signal_internal! { #[cfg(unix)] pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle { - CONTEXT.with(|ctx| { + match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).signal_handle.clone() - }) + }) { + Ok(signal_handle) => signal_handle, + Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), + } } } cfg_time! { pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle { - CONTEXT.with(|ctx| { + match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).time_handle.clone() - }) + }) { + Ok(time_handle) => time_handle, + Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), + } } cfg_test_util! { pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> { - CONTEXT.with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.clock.clone())) + match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.clock.clone())) { + Ok(clock) => clock, + Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), + } } } } cfg_rt! { pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> { - CONTEXT.with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone())) + match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone())) { + Ok(spawner) => spawner, + Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), + } } } -/// Set this [`Handle`] as the current active [`Handle`]. +/// Sets this [`Handle`] as the current active [`Handle`]. /// /// [`Handle`]: Handle pub(crate) fn enter(new: Handle) -> EnterGuard { - CONTEXT.with(|ctx| { - let old = ctx.borrow_mut().replace(new); - EnterGuard(old) - }) + match try_enter(new) { + Some(guard) => guard, + None => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), + } +} + +/// Sets this [`Handle`] as the current active [`Handle`]. +/// +/// [`Handle`]: Handle +pub(crate) fn try_enter(new: Handle) -> Option<EnterGuard> { + CONTEXT + .try_with(|ctx| { + let old = ctx.borrow_mut().replace(new); + EnterGuard(old) + }) + .ok() } #[derive(Debug)] diff --git a/src/runtime/enter.rs b/src/runtime/enter.rs index e91408f..3f14cb5 100644 --- a/src/runtime/enter.rs +++ b/src/runtime/enter.rs @@ -92,7 +92,7 @@ cfg_rt_multi_thread! { } cfg_rt! { - /// Disallow blocking in the current runtime context until the guard is dropped. + /// Disallows blocking in the current runtime context until the guard is dropped. pub(crate) fn disallow_blocking() -> DisallowBlockingGuard { let reset = ENTERED.with(|c| { if let EnterContext::Entered { diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs index bad6a00..cd1cb76 100644 --- a/src/runtime/handle.rs +++ b/src/runtime/handle.rs @@ -1,9 +1,10 @@ use crate::runtime::blocking::{BlockingTask, NoopSchedule}; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{blocking, context, driver, Spawner}; -use crate::util::error::CONTEXT_MISSING_ERROR; +use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR}; use std::future::Future; +use std::marker::PhantomData; use std::{error, fmt}; /// Handle to the runtime. @@ -17,15 +18,25 @@ pub struct Handle { pub(super) spawner: Spawner, /// Handles to the I/O drivers + #[cfg_attr( + not(any(feature = "net", feature = "process", all(unix, feature = "signal"))), + allow(dead_code) + )] pub(super) io_handle: driver::IoHandle, /// Handles to the signal drivers + #[cfg_attr( + not(any(feature = "signal", all(unix, feature = "process"))), + allow(dead_code) + )] pub(super) signal_handle: driver::SignalHandle, /// Handles to the time drivers + #[cfg_attr(not(feature = "time"), allow(dead_code))] pub(super) time_handle: driver::TimeHandle, /// Source of `Instant::now()` + #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))] pub(super) clock: driver::Clock, /// Blocking pool spawner @@ -41,12 +52,12 @@ pub struct Handle { #[derive(Debug)] #[must_use = "Creating and dropping a guard does nothing"] pub struct EnterGuard<'a> { - handle: &'a Handle, - guard: context::EnterGuard, + _guard: context::EnterGuard, + _handle_lifetime: PhantomData<&'a Handle>, } impl Handle { - /// Enter the runtime context. This allows you to construct types that must + /// Enters the runtime context. This allows you to construct types that must /// have an executor available on creation such as [`Sleep`] or [`TcpStream`]. /// It will also allow you to call methods such as [`tokio::spawn`]. /// @@ -55,12 +66,12 @@ impl Handle { /// [`tokio::spawn`]: fn@crate::spawn pub fn enter(&self) -> EnterGuard<'_> { EnterGuard { - handle: self, - guard: context::enter(self.clone()), + _guard: context::enter(self.clone()), + _handle_lifetime: PhantomData, } } - /// Returns a `Handle` view over the currently running `Runtime` + /// Returns a `Handle` view over the currently running `Runtime`. /// /// # Panic /// @@ -99,7 +110,7 @@ impl Handle { /// # } /// ``` pub fn current() -> Self { - context::current().expect(CONTEXT_MISSING_ERROR) + context::current() } /// Returns a Handle view over the currently running Runtime @@ -108,7 +119,7 @@ impl Handle { /// /// Contrary to `current`, this never panics pub fn try_current() -> Result<Self, TryCurrentError> { - context::current().ok_or(TryCurrentError(())) + context::try_current() } cfg_stats! { @@ -119,7 +130,7 @@ impl Handle { } } - /// Spawn a future onto the Tokio runtime. + /// Spawns a future onto the Tokio runtime. /// /// This spawns the given future onto the runtime's executor, usually a /// thread pool. The thread pool is then responsible for polling the future @@ -157,7 +168,7 @@ impl Handle { self.spawner.spawn(future) } - /// Run the provided function on an executor dedicated to blocking + /// Runs the provided function on an executor dedicated to blocking. /// operations. /// /// # Examples @@ -182,7 +193,11 @@ impl Handle { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - self.spawn_blocking_inner(func, None) + if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 { + self.spawn_blocking_inner(Box::new(func), None) + } else { + self.spawn_blocking_inner(func, None) + } } #[cfg_attr(tokio_track_caller, track_caller)] @@ -226,7 +241,7 @@ impl Handle { handle } - /// Run a future to completion on this `Handle`'s associated `Runtime`. + /// Runs a future to completion on this `Handle`'s associated `Runtime`. /// /// This runs the given future on the current thread, blocking until it is /// complete, and yielding its resolved result. Any tasks or timers which @@ -319,17 +334,60 @@ impl Handle { } /// Error returned by `try_current` when no Runtime has been started -pub struct TryCurrentError(()); +#[derive(Debug)] +pub struct TryCurrentError { + kind: TryCurrentErrorKind, +} + +impl TryCurrentError { + pub(crate) fn new_no_context() -> Self { + Self { + kind: TryCurrentErrorKind::NoContext, + } + } + + pub(crate) fn new_thread_local_destroyed() -> Self { + Self { + kind: TryCurrentErrorKind::ThreadLocalDestroyed, + } + } -impl fmt::Debug for TryCurrentError { + /// Returns true if the call failed because there is currently no runtime in + /// the Tokio context. + pub fn is_missing_context(&self) -> bool { + matches!(self.kind, TryCurrentErrorKind::NoContext) + } + + /// Returns true if the call failed because the Tokio context thread-local + /// had been destroyed. This can usually only happen if in the destructor of + /// other thread-locals. + pub fn is_thread_local_destroyed(&self) -> bool { + matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed) + } +} + +enum TryCurrentErrorKind { + NoContext, + ThreadLocalDestroyed, +} + +impl fmt::Debug for TryCurrentErrorKind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryCurrentError").finish() + use TryCurrentErrorKind::*; + match self { + NoContext => f.write_str("NoContext"), + ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"), + } } } impl fmt::Display for TryCurrentError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(CONTEXT_MISSING_ERROR) + use TryCurrentErrorKind::*; + match self.kind { + NoContext => f.write_str(CONTEXT_MISSING_ERROR), + ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR), + } } } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index ec7d0c0..96bb47c 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -205,7 +205,7 @@ cfg_rt! { use self::enter::enter; mod handle; - pub use handle::{EnterGuard, Handle}; + pub use handle::{EnterGuard, Handle, TryCurrentError}; mod spawner; use self::spawner::Spawner; @@ -294,7 +294,7 @@ cfg_rt! { type Callback = std::sync::Arc<dyn Fn() + Send + Sync>; impl Runtime { - /// Create a new runtime instance with default configuration values. + /// Creates a new runtime instance with default configuration values. /// /// This results in the multi threaded scheduler, I/O driver, and time driver being /// initialized. @@ -329,7 +329,7 @@ cfg_rt! { Builder::new_multi_thread().enable_all().build() } - /// Return a handle to the runtime's spawner. + /// Returns a handle to the runtime's spawner. /// /// The returned handle can be used to spawn tasks that run on this runtime, and can /// be cloned to allow moving the `Handle` to other threads. @@ -350,7 +350,7 @@ cfg_rt! { &self.handle } - /// Spawn a future onto the Tokio runtime. + /// Spawns a future onto the Tokio runtime. /// /// This spawns the given future onto the runtime's executor, usually a /// thread pool. The thread pool is then responsible for polling the future @@ -384,7 +384,7 @@ cfg_rt! { self.handle.spawn(future) } - /// Run the provided function on an executor dedicated to blocking operations. + /// Runs the provided function on an executor dedicated to blocking operations. /// /// # Examples /// @@ -409,7 +409,7 @@ cfg_rt! { self.handle.spawn_blocking(func) } - /// Run a future to completion on the Tokio runtime. This is the + /// Runs a future to completion on the Tokio runtime. This is the /// runtime's entry point. /// /// This runs the given future on the current thread, blocking until it is @@ -464,7 +464,7 @@ cfg_rt! { } } - /// Enter the runtime context. + /// Enters the runtime context. /// /// This allows you to construct types that must have an executor /// available on creation such as [`Sleep`] or [`TcpStream`]. It will @@ -500,7 +500,7 @@ cfg_rt! { self.handle.enter() } - /// Shutdown the runtime, waiting for at most `duration` for all spawned + /// Shuts down the runtime, waiting for at most `duration` for all spawned /// task to shutdown. /// /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to @@ -537,11 +537,11 @@ cfg_rt! { /// ``` pub fn shutdown_timeout(mut self, duration: Duration) { // Wakeup and shutdown all the worker threads - self.handle.shutdown(); + self.handle.clone().shutdown(); self.blocking_pool.shutdown(Some(duration)); } - /// Shutdown the runtime, without waiting for any spawned tasks to shutdown. + /// Shuts down the runtime, without waiting for any spawned tasks to shutdown. /// /// This can be useful if you want to drop a runtime from within another runtime. /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks @@ -571,4 +571,30 @@ cfg_rt! { self.shutdown_timeout(Duration::from_nanos(0)) } } + + #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let + impl Drop for Runtime { + fn drop(&mut self) { + match &mut self.kind { + Kind::CurrentThread(basic) => { + // This ensures that tasks spawned on the basic runtime are dropped inside the + // runtime's context. + match self::context::try_enter(self.handle.clone()) { + Some(guard) => basic.set_context_guard(guard), + None => { + // The context thread-local has alread been destroyed. + // + // We don't set the guard in this case. Calls to tokio::spawn in task + // destructors would fail regardless if this happens. + }, + } + }, + #[cfg(feature = "rt-multi-thread")] + Kind::ThreadPool(_) => { + // The threaded scheduler drops its tasks on its worker threads, which is + // already in the runtime's context. + }, + } + } + } } diff --git a/src/runtime/stats/stats.rs b/src/runtime/stats/stats.rs index 39a48ae..b2bcacc 100644 --- a/src/runtime/stats/stats.rs +++ b/src/runtime/stats/stats.rs @@ -1,6 +1,9 @@ //! This file contains the types necessary to collect various types of stats. use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed}; +use std::convert::TryFrom; +use std::time::{Duration, Instant}; + /// This type contains methods to retrieve stats from a Tokio runtime. #[derive(Debug)] pub struct RuntimeStats { @@ -14,6 +17,7 @@ pub struct WorkerStats { park_count: AtomicU64, steal_count: AtomicU64, poll_count: AtomicU64, + busy_duration_total: AtomicU64, } impl RuntimeStats { @@ -24,6 +28,7 @@ impl RuntimeStats { park_count: AtomicU64::new(0), steal_count: AtomicU64::new(0), poll_count: AtomicU64::new(0), + busy_duration_total: AtomicU64::new(0), }); } @@ -54,6 +59,11 @@ impl WorkerStats { pub fn poll_count(&self) -> u64 { self.poll_count.load(Relaxed) } + + /// Returns the total amount of time this worker has been busy for. + pub fn total_busy_duration(&self) -> Duration { + Duration::from_nanos(self.busy_duration_total.load(Relaxed)) + } } pub(crate) struct WorkerStatsBatcher { @@ -61,6 +71,9 @@ pub(crate) struct WorkerStatsBatcher { park_count: u64, steal_count: u64, poll_count: u64, + /// The total busy duration in nanoseconds. + busy_duration_total: u64, + last_resume_time: Instant, } impl WorkerStatsBatcher { @@ -70,6 +83,8 @@ impl WorkerStatsBatcher { park_count: 0, steal_count: 0, poll_count: 0, + busy_duration_total: 0, + last_resume_time: Instant::now(), } } pub(crate) fn submit(&mut self, to: &RuntimeStats) { @@ -78,13 +93,23 @@ impl WorkerStatsBatcher { worker.park_count.store(self.park_count, Relaxed); worker.steal_count.store(self.steal_count, Relaxed); worker.poll_count.store(self.poll_count, Relaxed); + + worker + .busy_duration_total + .store(self.busy_duration_total, Relaxed); } pub(crate) fn about_to_park(&mut self) { self.park_count += 1; + + let busy_duration = self.last_resume_time.elapsed(); + let busy_duration = u64::try_from(busy_duration.as_nanos()).unwrap_or(u64::MAX); + self.busy_duration_total += busy_duration; } - pub(crate) fn returned_from_park(&mut self) {} + pub(crate) fn returned_from_park(&mut self) { + self.last_resume_time = Instant::now(); + } #[cfg(feature = "rt-multi-thread")] pub(crate) fn incr_steal_count(&mut self, by: u16) { diff --git a/src/runtime/task/core.rs b/src/runtime/task/core.rs index 51b6496..776e834 100644 --- a/src/runtime/task/core.rs +++ b/src/runtime/task/core.rs @@ -44,22 +44,22 @@ pub(super) struct CoreStage<T: Future> { /// /// Holds the future or output, depending on the stage of execution. pub(super) struct Core<T: Future, S> { - /// Scheduler used to drive this future + /// Scheduler used to drive this future. pub(super) scheduler: S, - /// Either the future or the output + /// Either the future or the output. pub(super) stage: CoreStage<T>, } /// Crate public as this is also needed by the pool. #[repr(C)] pub(crate) struct Header { - /// Task state + /// Task state. pub(super) state: State, pub(super) owned: UnsafeCell<linked_list::Pointers<Header>>, - /// Pointer to next task, used with the injection queue + /// Pointer to next task, used with the injection queue. pub(super) queue_next: UnsafeCell<Option<NonNull<Header>>>, /// Table of function pointers for executing actions on the task. @@ -133,7 +133,7 @@ impl<T: Future> CoreStage<T> { self.stage.with_mut(f) } - /// Poll the future + /// Polls the future. /// /// # Safety /// @@ -169,7 +169,7 @@ impl<T: Future> CoreStage<T> { res } - /// Drop the future + /// Drops the future. /// /// # Safety /// @@ -181,7 +181,7 @@ impl<T: Future> CoreStage<T> { } } - /// Store the task output + /// Stores the task output. /// /// # Safety /// @@ -193,7 +193,7 @@ impl<T: Future> CoreStage<T> { } } - /// Take the task output + /// Takes the task output. /// /// # Safety /// diff --git a/src/runtime/task/error.rs b/src/runtime/task/error.rs index 17fb093..1a8129b 100644 --- a/src/runtime/task/error.rs +++ b/src/runtime/task/error.rs @@ -29,12 +29,12 @@ impl JoinError { } } - /// Returns true if the error was caused by the task being cancelled + /// Returns true if the error was caused by the task being cancelled. pub fn is_cancelled(&self) -> bool { matches!(&self.repr, Repr::Cancelled) } - /// Returns true if the error was caused by the task panicking + /// Returns true if the error was caused by the task panicking. /// /// # Examples /// diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs index 41b4193..0996e52 100644 --- a/src/runtime/task/harness.rs +++ b/src/runtime/task/harness.rs @@ -10,7 +10,7 @@ use std::panic; use std::ptr::NonNull; use std::task::{Context, Poll, Waker}; -/// Typed raw task handle +/// Typed raw task handle. pub(super) struct Harness<T: Future, S: 'static> { cell: NonNull<Cell<T, S>>, } @@ -74,7 +74,7 @@ where } } - /// Poll the task and cancel it if necessary. This takes ownership of a + /// Polls the task and cancel it if necessary. This takes ownership of a /// ref-count. /// /// If the return value is Notified, the caller is given ownership of two @@ -124,7 +124,7 @@ where } } - /// Forcibly shutdown the task + /// Forcibly shuts down the task. /// /// Attempt to transition to `Running` in order to forcibly shutdown the /// task. If the task is currently running or in a state of completion, then @@ -192,7 +192,7 @@ where } } - /// Remotely abort the task. + /// Remotely aborts the task. /// /// The caller should hold a ref-count, but we do not consume it. /// @@ -280,7 +280,7 @@ where // ====== internal ====== - /// Complete the task. This method assumes that the state is RUNNING. + /// Completes the task. This method assumes that the state is RUNNING. fn complete(self) { // The future has completed and its output has been written to the task // stage. We transition from running to complete. @@ -310,7 +310,7 @@ where } } - /// Release the task from the scheduler. Returns the number of ref-counts + /// Releases the task from the scheduler. Returns the number of ref-counts /// that should be decremented. fn release(&self) -> usize { // We don't actually increment the ref-count here, but the new task is @@ -325,7 +325,7 @@ where } } - /// Create a new task that holds its own ref-count. + /// Creates a new task that holds its own ref-count. /// /// # Safety /// @@ -425,7 +425,7 @@ enum PollFuture { Dealloc, } -/// Cancel the task and store the appropriate error in the stage field. +/// Cancels the task and store the appropriate error in the stage field. fn cancel_task<T: Future>(stage: &CoreStage<T>) { // Drop the future from a panic guard. let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { @@ -442,7 +442,7 @@ fn cancel_task<T: Future>(stage: &CoreStage<T>) { } } -/// Poll the future. If the future completes, the output is written to the +/// Polls the future. If the future completes, the output is written to the /// stage field. fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> { // Poll the future. diff --git a/src/runtime/task/inject.rs b/src/runtime/task/inject.rs index d1f0aee..1585e13 100644 --- a/src/runtime/task/inject.rs +++ b/src/runtime/task/inject.rs @@ -11,7 +11,7 @@ use std::sync::atomic::Ordering::{Acquire, Release}; /// Growable, MPMC queue used to inject new tasks into the scheduler and as an /// overflow queue when the local, fixed-size, array queue overflows. pub(crate) struct Inject<T: 'static> { - /// Pointers to the head and tail of the queue + /// Pointers to the head and tail of the queue. pointers: Mutex<Pointers>, /// Number of pending tasks in the queue. This helps prevent unnecessary @@ -22,13 +22,13 @@ pub(crate) struct Inject<T: 'static> { } struct Pointers { - /// True if the queue is closed + /// True if the queue is closed. is_closed: bool, - /// Linked-list head + /// Linked-list head. head: Option<NonNull<task::Header>>, - /// Linked-list tail + /// Linked-list tail. tail: Option<NonNull<task::Header>>, } @@ -52,7 +52,7 @@ impl<T: 'static> Inject<T> { self.len() == 0 } - /// Close the injection queue, returns `true` if the queue is open when the + /// Closes the injection queue, returns `true` if the queue is open when the /// transition is made. pub(crate) fn close(&self) -> bool { let mut p = self.pointers.lock(); @@ -137,7 +137,7 @@ impl<T: 'static> Inject<T> { self.push_batch_inner(first, prev, counter); } - /// Insert several tasks that have been linked together into the queue. + /// Inserts several tasks that have been linked together into the queue. /// /// The provided head and tail may be be the same task. In this case, a /// single task is inserted. diff --git a/src/runtime/task/list.rs b/src/runtime/task/list.rs index edd3c4f..7758f8d 100644 --- a/src/runtime/task/list.rs +++ b/src/runtime/task/list.rs @@ -78,7 +78,7 @@ impl<S: 'static> OwnedTasks<S> { } } - /// Bind the provided task to this OwnedTasks instance. This fails if the + /// Binds the provided task to this OwnedTasks instance. This fails if the /// OwnedTasks has been closed. pub(crate) fn bind<T>( &self, @@ -110,7 +110,7 @@ impl<S: 'static> OwnedTasks<S> { } } - /// Assert that the given task is owned by this OwnedTasks and convert it to + /// Asserts that the given task is owned by this OwnedTasks and convert it to /// a LocalNotified, giving the thread permission to poll this task. #[inline] pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> { @@ -124,7 +124,7 @@ impl<S: 'static> OwnedTasks<S> { } } - /// Shut down all tasks in the collection. This call also closes the + /// Shuts down all tasks in the collection. This call also closes the /// collection, preventing new items from being added. pub(crate) fn close_and_shutdown_all(&self) where @@ -213,7 +213,7 @@ impl<S: 'static> LocalOwnedTasks<S> { } } - /// Shut down all tasks in the collection. This call also closes the + /// Shuts down all tasks in the collection. This call also closes the /// collection, preventing new items from being added. pub(crate) fn close_and_shutdown_all(&self) where @@ -241,7 +241,7 @@ impl<S: 'static> LocalOwnedTasks<S> { unsafe { inner.list.remove(task.header().into()) }) } - /// Assert that the given task is owned by this LocalOwnedTasks and convert + /// Asserts that the given task is owned by this LocalOwnedTasks and convert /// it to a LocalNotified, giving the thread permission to poll this task. #[inline] pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> { diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index 5f0d5ab..1f18209 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -173,7 +173,7 @@ use std::marker::PhantomData; use std::ptr::NonNull; use std::{fmt, mem}; -/// An owned handle to the task, tracked by ref count +/// An owned handle to the task, tracked by ref count. #[repr(transparent)] pub(crate) struct Task<S: 'static> { raw: RawTask, @@ -211,7 +211,7 @@ pub(crate) struct UnownedTask<S: 'static> { unsafe impl<S> Send for UnownedTask<S> {} unsafe impl<S> Sync for UnownedTask<S> {} -/// Task result sent back +/// Task result sent back. pub(crate) type Result<T> = std::result::Result<T, JoinError>; pub(crate) trait Schedule: Sync + Sized + 'static { @@ -260,7 +260,7 @@ cfg_rt! { (task, notified, join) } - /// Create a new task with an associated join handle. This method is used + /// Creates a new task with an associated join handle. This method is used /// only when the task is not going to be stored in an `OwnedTasks` list. /// /// Currently only blocking tasks use this method. @@ -327,7 +327,7 @@ cfg_rt_multi_thread! { } impl<S: Schedule> Task<S> { - /// Pre-emptively cancel the task as part of the shutdown process. + /// Pre-emptively cancels the task as part of the shutdown process. pub(crate) fn shutdown(self) { let raw = self.raw; mem::forget(self); @@ -336,7 +336,7 @@ impl<S: Schedule> Task<S> { } impl<S: Schedule> LocalNotified<S> { - /// Run the task + /// Runs the task. pub(crate) fn run(self) { let raw = self.task.raw; mem::forget(self); @@ -420,7 +420,7 @@ impl<S> fmt::Debug for Notified<S> { /// # Safety /// -/// Tasks are pinned +/// Tasks are pinned. unsafe impl<S> linked_list::Link for Task<S> { type Handle = Task<S>; type Target = Header; diff --git a/src/runtime/task/raw.rs b/src/runtime/task/raw.rs index 8c2c3f7..fbc9574 100644 --- a/src/runtime/task/raw.rs +++ b/src/runtime/task/raw.rs @@ -10,22 +10,22 @@ pub(super) struct RawTask { } pub(super) struct Vtable { - /// Poll the future + /// Polls the future. pub(super) poll: unsafe fn(NonNull<Header>), - /// Deallocate the memory + /// Deallocates the memory. pub(super) dealloc: unsafe fn(NonNull<Header>), - /// Read the task output, if complete + /// Reads the task output, if complete. pub(super) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker), - /// The join handle has been dropped + /// The join handle has been dropped. pub(super) drop_join_handle_slow: unsafe fn(NonNull<Header>), - /// The task is remotely aborted + /// The task is remotely aborted. pub(super) remote_abort: unsafe fn(NonNull<Header>), - /// Scheduler is being shutdown + /// Scheduler is being shutdown. pub(super) shutdown: unsafe fn(NonNull<Header>), } diff --git a/src/runtime/task/state.rs b/src/runtime/task/state.rs index 059a7f9..c2d5b28 100644 --- a/src/runtime/task/state.rs +++ b/src/runtime/task/state.rs @@ -8,7 +8,7 @@ pub(super) struct State { val: AtomicUsize, } -/// Current state value +/// Current state value. #[derive(Copy, Clone)] pub(super) struct Snapshot(usize); @@ -19,20 +19,20 @@ const RUNNING: usize = 0b0001; /// The task is complete. /// -/// Once this bit is set, it is never unset +/// Once this bit is set, it is never unset. const COMPLETE: usize = 0b0010; -/// Extracts the task's lifecycle value from the state +/// Extracts the task's lifecycle value from the state. const LIFECYCLE_MASK: usize = 0b11; /// Flag tracking if the task has been pushed into a run queue. const NOTIFIED: usize = 0b100; -/// The join handle is still around +/// The join handle is still around. #[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 const JOIN_INTEREST: usize = 0b1_000; -/// A join handle waker has been set +/// A join handle waker has been set. #[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 const JOIN_WAKER: usize = 0b10_000; @@ -40,19 +40,19 @@ const JOIN_WAKER: usize = 0b10_000; #[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 const CANCELLED: usize = 0b100_000; -/// All bits +/// All bits. const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED; /// Bits used by the ref count portion of the state. const REF_COUNT_MASK: usize = !STATE_MASK; -/// Number of positions to shift the ref count +/// Number of positions to shift the ref count. const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize; -/// One ref count +/// One ref count. const REF_ONE: usize = 1 << REF_COUNT_SHIFT; -/// State a task is initialized with +/// State a task is initialized with. /// /// A task is initialized with three references: /// @@ -96,7 +96,7 @@ pub(super) enum TransitionToNotifiedByRef { /// All transitions are performed via RMW operations. This establishes an /// unambiguous modification order. impl State { - /// Return a task's initial state + /// Returns a task's initial state. pub(super) fn new() -> State { // The raw task returned by this method has a ref-count of three. See // the comment on INITIAL_STATE for more. @@ -110,7 +110,7 @@ impl State { Snapshot(self.val.load(Acquire)) } - /// Attempt to transition the lifecycle to `Running`. This sets the + /// Attempts to transition the lifecycle to `Running`. This sets the /// notified bit to false so notifications during the poll can be detected. pub(super) fn transition_to_running(&self) -> TransitionToRunning { self.fetch_update_action(|mut next| { @@ -190,7 +190,7 @@ impl State { Snapshot(prev.0 ^ DELTA) } - /// Transition from `Complete` -> `Terminal`, decrementing the reference + /// Transitions from `Complete` -> `Terminal`, decrementing the reference /// count the specified number of times. /// /// Returns true if the task should be deallocated. @@ -270,10 +270,10 @@ impl State { }) } - /// Set the cancelled bit and transition the state to `NOTIFIED` if idle. + /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle. /// /// Returns `true` if the task needs to be submitted to the pool for - /// execution + /// execution. pub(super) fn transition_to_notified_and_cancel(&self) -> bool { self.fetch_update_action(|mut snapshot| { if snapshot.is_cancelled() || snapshot.is_complete() { @@ -306,7 +306,7 @@ impl State { }) } - /// Set the `CANCELLED` bit and attempt to transition to `Running`. + /// Sets the `CANCELLED` bit and attempts to transition to `Running`. /// /// Returns `true` if the transition to `Running` succeeded. pub(super) fn transition_to_shutdown(&self) -> bool { @@ -330,7 +330,7 @@ impl State { } /// Optimistically tries to swap the state assuming the join handle is - /// __immediately__ dropped on spawn + /// __immediately__ dropped on spawn. pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> { use std::sync::atomic::Ordering::Relaxed; @@ -352,7 +352,7 @@ impl State { .map_err(|_| ()) } - /// Try to unset the JOIN_INTEREST flag. + /// Tries to unset the JOIN_INTEREST flag. /// /// Returns `Ok` if the operation happens before the task transitions to a /// completed state, `Err` otherwise. @@ -371,7 +371,7 @@ impl State { }) } - /// Set the `JOIN_WAKER` bit. + /// Sets the `JOIN_WAKER` bit. /// /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if /// the task has completed. diff --git a/src/runtime/tests/loom_basic_scheduler.rs b/src/runtime/tests/loom_basic_scheduler.rs index e6221d3..d2894b9 100644 --- a/src/runtime/tests/loom_basic_scheduler.rs +++ b/src/runtime/tests/loom_basic_scheduler.rs @@ -63,6 +63,45 @@ fn block_on_num_polls() { }); } +#[test] +fn assert_no_unnecessary_polls() { + loom::model(|| { + // // After we poll outer future, woken should reset to false + let rt = Builder::new_current_thread().build().unwrap(); + let (tx, rx) = oneshot::channel(); + let pending_cnt = Arc::new(AtomicUsize::new(0)); + + rt.spawn(async move { + for _ in 0..24 { + task::yield_now().await; + } + tx.send(()).unwrap(); + }); + + let pending_cnt_clone = pending_cnt.clone(); + rt.block_on(async move { + // use task::yield_now() to ensure woken set to true + // ResetFuture will be polled at most once + // Here comes two cases + // 1. recv no message from channel, ResetFuture will be polled + // but get Pending and we record ResetFuture.pending_cnt ++. + // Then when message arrive, ResetFuture returns Ready. So we + // expect ResetFuture.pending_cnt = 1 + // 2. recv message from channel, ResetFuture returns Ready immediately. + // We expect ResetFuture.pending_cnt = 0 + task::yield_now().await; + ResetFuture { + rx, + pending_cnt: pending_cnt_clone, + } + .await; + }); + + let pending_cnt = pending_cnt.load(Acquire); + assert!(pending_cnt <= 1); + }); +} + struct BlockedFuture { rx: Receiver<()>, num_polls: Arc<AtomicUsize>, @@ -80,3 +119,22 @@ impl Future for BlockedFuture { } } } + +struct ResetFuture { + rx: Receiver<()>, + pending_cnt: Arc<AtomicUsize>, +} + +impl Future for ResetFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match Pin::new(&mut self.rx).poll(cx) { + Poll::Pending => { + self.pending_cnt.fetch_add(1, Release); + Poll::Pending + } + _ => Poll::Ready(()), + } + } +} diff --git a/src/runtime/thread_pool/idle.rs b/src/runtime/thread_pool/idle.rs index 2cac30e..6b7ee12 100644 --- a/src/runtime/thread_pool/idle.rs +++ b/src/runtime/thread_pool/idle.rs @@ -126,7 +126,7 @@ impl Idle { } } - /// Returns `true` if `worker_id` is contained in the sleep set + /// Returns `true` if `worker_id` is contained in the sleep set. pub(super) fn is_parked(&self, worker_id: usize) -> bool { let sleepers = self.sleepers.lock(); sleepers.contains(&worker_id) diff --git a/src/runtime/thread_pool/mod.rs b/src/runtime/thread_pool/mod.rs index f2e68f6..82e34c7 100644 --- a/src/runtime/thread_pool/mod.rs +++ b/src/runtime/thread_pool/mod.rs @@ -24,7 +24,7 @@ pub(crate) struct ThreadPool { spawner: Spawner, } -/// Submit futures to the associated thread pool for execution. +/// Submits futures to the associated thread pool for execution. /// /// A `Spawner` instance is a handle to a single thread pool that allows the owner /// of the handle to spawn futures onto the thread pool. diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs index 44f8db8..ae8efe6 100644 --- a/src/runtime/thread_pool/worker.rs +++ b/src/runtime/thread_pool/worker.rs @@ -126,7 +126,7 @@ pub(super) struct Shared { /// how they communicate between each other. remotes: Box<[Remote]>, - /// Submit work to the scheduler while **not** currently on a worker thread. + /// Submits work to the scheduler while **not** currently on a worker thread. inject: Inject<Arc<Shared>>, /// Coordinates idle workers @@ -147,13 +147,13 @@ pub(super) struct Shared { /// Callback for a worker unparking itself after_unpark: Option<Callback>, - /// Collect stats from the runtime. + /// Collects stats from the runtime. stats: RuntimeStats, } /// Used to communicate with a worker from other threads. struct Remote { - /// Steal tasks from this worker. + /// Steals tasks from this worker. steal: queue::Steal<Arc<Shared>>, /// Unparks the associated worker thread @@ -587,9 +587,9 @@ impl Core { worker.shared.transition_worker_from_searching(); } - /// Prepare the worker state for parking + /// Prepares the worker state for parking. /// - /// Returns true if the transition happend, false if there is work to do first + /// Returns true if the transition happend, false if there is work to do first. fn transition_to_parked(&mut self, worker: &Worker) -> bool { // Workers should not park if they have work to do if self.lifo_slot.is_some() || self.run_queue.has_tasks() { @@ -653,7 +653,7 @@ impl Core { self.stats.submit(&worker.shared.stats); } - /// Shutdown the core + /// Shuts down the core. fn shutdown(&mut self) { // Take the core let mut park = self.park.take().expect("park missing"); @@ -666,7 +666,7 @@ impl Core { } impl Worker { - /// Returns a reference to the scheduler's injection queue + /// Returns a reference to the scheduler's injection queue. fn inject(&self) -> &Inject<Arc<Shared>> { &self.shared.inject } diff --git a/src/signal/ctrl_c.rs b/src/signal/ctrl_c.rs index 1eeeb85..b26ab7e 100644 --- a/src/signal/ctrl_c.rs +++ b/src/signal/ctrl_c.rs @@ -47,6 +47,15 @@ use std::io; /// println!("received ctrl-c event"); /// } /// ``` +/// +/// Listen in the background: +/// +/// ```rust,no_run +/// tokio::spawn(async move { +/// tokio::signal::ctrl_c().await.unwrap(); +/// // Your handler here +/// }); +/// ``` pub async fn ctrl_c() -> io::Result<()> { os_impl::ctrl_c()?.recv().await; Ok(()) diff --git a/src/signal/mod.rs b/src/signal/mod.rs index fe572f0..882218a 100644 --- a/src/signal/mod.rs +++ b/src/signal/mod.rs @@ -1,4 +1,4 @@ -//! Asynchronous signal handling for Tokio +//! Asynchronous signal handling for Tokio. //! //! Note that signal handling is in general a very tricky topic and should be //! used with great care. This crate attempts to implement 'best practice' for diff --git a/src/signal/reusable_box.rs b/src/signal/reusable_box.rs index 426ecb0..796fa21 100644 --- a/src/signal/reusable_box.rs +++ b/src/signal/reusable_box.rs @@ -30,7 +30,7 @@ impl<T> ReusableBoxFuture<T> { Self { boxed } } - /// Replace the future currently stored in this box. + /// Replaces the future currently stored in this box. /// /// This reallocates if and only if the layout of the provided future is /// different from the layout of the currently stored future. @@ -43,7 +43,7 @@ impl<T> ReusableBoxFuture<T> { } } - /// Replace the future currently stored in this box. + /// Replaces the future currently stored in this box. /// /// This function never reallocates, but returns an error if the provided /// future has a different size or alignment from the currently stored @@ -70,7 +70,7 @@ impl<T> ReusableBoxFuture<T> { } } - /// Set the current future. + /// Sets the current future. /// /// # Safety /// @@ -103,14 +103,14 @@ impl<T> ReusableBoxFuture<T> { } } - /// Get a pinned reference to the underlying future. + /// Gets a pinned reference to the underlying future. pub(crate) fn get_pin(&mut self) -> Pin<&mut (dyn Future<Output = T> + Send)> { // SAFETY: The user of this box cannot move the box, and we do not move it // either. unsafe { Pin::new_unchecked(self.boxed.as_mut()) } } - /// Poll the future stored inside this box. + /// Polls the future stored inside this box. pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<T> { self.get_pin().poll(cx) } @@ -119,7 +119,7 @@ impl<T> ReusableBoxFuture<T> { impl<T> Future for ReusableBoxFuture<T> { type Output = T; - /// Poll the future stored inside this box. + /// Polls the future stored inside this box. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { Pin::into_inner(self).get_pin().poll(cx) } diff --git a/src/sync/batch_semaphore.rs b/src/sync/batch_semaphore.rs index 9b43404..b5c39d2 100644 --- a/src/sync/batch_semaphore.rs +++ b/src/sync/batch_semaphore.rs @@ -1,5 +1,5 @@ #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] -//! # Implementation Details +//! # Implementation Details. //! //! The semaphore is implemented using an intrusive linked list of waiters. An //! atomic counter tracks the number of available permits. If the semaphore does @@ -138,7 +138,7 @@ impl Semaphore { } } - /// Creates a new semaphore with the initial number of permits + /// Creates a new semaphore with the initial number of permits. /// /// Maximum number of permits on 32-bit platforms is `1<<29`. /// @@ -159,7 +159,7 @@ impl Semaphore { } } - /// Returns the current number of available permits + /// Returns the current number of available permits. pub(crate) fn available_permits(&self) -> usize { self.permits.load(Acquire) >> Self::PERMIT_SHIFT } @@ -197,7 +197,7 @@ impl Semaphore { } } - /// Returns true if the semaphore is closed + /// Returns true if the semaphore is closed. pub(crate) fn is_closed(&self) -> bool { self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED } diff --git a/src/sync/broadcast.rs b/src/sync/broadcast.rs index a2ca445..0d9cd3b 100644 --- a/src/sync/broadcast.rs +++ b/src/sync/broadcast.rs @@ -293,37 +293,37 @@ pub mod error { use self::error::*; -/// Data shared between senders and receivers +/// Data shared between senders and receivers. struct Shared<T> { - /// slots in the channel + /// slots in the channel. buffer: Box<[RwLock<Slot<T>>]>, - /// Mask a position -> index + /// Mask a position -> index. mask: usize, /// Tail of the queue. Includes the rx wait list. tail: Mutex<Tail>, - /// Number of outstanding Sender handles + /// Number of outstanding Sender handles. num_tx: AtomicUsize, } -/// Next position to write a value +/// Next position to write a value. struct Tail { - /// Next position to write to + /// Next position to write to. pos: u64, - /// Number of active receivers + /// Number of active receivers. rx_cnt: usize, - /// True if the channel is closed + /// True if the channel is closed. closed: bool, - /// Receivers waiting for a value + /// Receivers waiting for a value. waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>, } -/// Slot in the buffer +/// Slot in the buffer. struct Slot<T> { /// Remaining number of receivers that are expected to see this value. /// @@ -333,7 +333,7 @@ struct Slot<T> { /// acquired. rem: AtomicUsize, - /// Uniquely identifies the `send` stored in the slot + /// Uniquely identifies the `send` stored in the slot. pos: u64, /// True signals the channel is closed. @@ -346,9 +346,9 @@ struct Slot<T> { val: UnsafeCell<Option<T>>, } -/// An entry in the wait queue +/// An entry in the wait queue. struct Waiter { - /// True if queued + /// True if queued. queued: bool, /// Task waiting on the broadcast channel. @@ -365,12 +365,12 @@ struct RecvGuard<'a, T> { slot: RwLockReadGuard<'a, Slot<T>>, } -/// Receive a value future +/// Receive a value future. struct Recv<'a, T> { - /// Receiver being waited on + /// Receiver being waited on. receiver: &'a mut Receiver<T>, - /// Entry in the waiter `LinkedList` + /// Entry in the waiter `LinkedList`. waiter: UnsafeCell<Waiter>, } diff --git a/src/sync/mpsc/block.rs b/src/sync/mpsc/block.rs index 6e7b700..58f4a9f 100644 --- a/src/sync/mpsc/block.rs +++ b/src/sync/mpsc/block.rs @@ -40,7 +40,7 @@ struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]); use super::BLOCK_CAP; -/// Masks an index to get the block identifier +/// Masks an index to get the block identifier. const BLOCK_MASK: usize = !(BLOCK_CAP - 1); /// Masks an index to get the value offset in a block. @@ -89,7 +89,7 @@ impl<T> Block<T> { } } - /// Returns `true` if the block matches the given index + /// Returns `true` if the block matches the given index. pub(crate) fn is_at_index(&self, index: usize) -> bool { debug_assert!(offset(index) == 0); self.start_index == index diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs index bcad84d..5a2bfa6 100644 --- a/src/sync/mpsc/bounded.rs +++ b/src/sync/mpsc/bounded.rs @@ -10,7 +10,7 @@ cfg_time! { use std::fmt; use std::task::{Context, Poll}; -/// Send values to the associated `Receiver`. +/// Sends values to the associated `Receiver`. /// /// Instances are created by the [`channel`](channel) function. /// @@ -22,7 +22,7 @@ pub struct Sender<T> { chan: chan::Tx<T, Semaphore>, } -/// Permit to send one value into the channel. +/// Permits to send one value into the channel. /// /// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`] /// and are used to guarantee channel capacity before generating a message to send. @@ -49,7 +49,7 @@ pub struct OwnedPermit<T> { chan: Option<chan::Tx<T, Semaphore>>, } -/// Receive values from the associated `Sender`. +/// Receives values from the associated `Sender`. /// /// Instances are created by the [`channel`](channel) function. /// @@ -57,7 +57,7 @@ pub struct OwnedPermit<T> { /// /// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html pub struct Receiver<T> { - /// The channel receiver + /// The channel receiver. chan: chan::Rx<T, Semaphore>, } @@ -187,7 +187,7 @@ impl<T> Receiver<T> { poll_fn(|cx| self.chan.recv(cx)).await } - /// Try to receive the next value for this receiver. + /// Tries to receive the next value for this receiver. /// /// This method returns the [`Empty`] error if the channel is currently /// empty, but there are still outstanding [senders] or [permits]. @@ -672,7 +672,7 @@ impl<T> Sender<T> { self.chan.is_closed() } - /// Wait for channel capacity. Once capacity to send one message is + /// Waits for channel capacity. Once capacity to send one message is /// available, it is reserved for the caller. /// /// If the channel is full, the function waits for the number of unreceived @@ -721,7 +721,7 @@ impl<T> Sender<T> { Ok(Permit { chan: &self.chan }) } - /// Wait for channel capacity, moving the `Sender` and returning an owned + /// Waits for channel capacity, moving the `Sender` and returning an owned /// permit. Once capacity to send one message is available, it is reserved /// for the caller. /// @@ -815,7 +815,7 @@ impl<T> Sender<T> { } } - /// Try to acquire a slot in the channel without waiting for the slot to become + /// Tries to acquire a slot in the channel without waiting for the slot to become /// available. /// /// If the channel is full this function will return [`TrySendError`], otherwise @@ -868,7 +868,7 @@ impl<T> Sender<T> { Ok(Permit { chan: &self.chan }) } - /// Try to acquire a slot in the channel without waiting for the slot to become + /// Tries to acquire a slot in the channel without waiting for the slot to become /// available, returning an owned permit. /// /// This moves the sender _by value_, and returns an owned permit that can @@ -1117,7 +1117,7 @@ impl<T> OwnedPermit<T> { Sender { chan } } - /// Release the reserved capacity *without* sending a message, returning the + /// Releases the reserved capacity *without* sending a message, returning the /// [`Sender`]. /// /// # Examples diff --git a/src/sync/mpsc/chan.rs b/src/sync/mpsc/chan.rs index 637ae1f..c3007de 100644 --- a/src/sync/mpsc/chan.rs +++ b/src/sync/mpsc/chan.rs @@ -14,7 +14,7 @@ use std::sync::atomic::Ordering::{AcqRel, Relaxed}; use std::task::Poll::{Pending, Ready}; use std::task::{Context, Poll}; -/// Channel sender +/// Channel sender. pub(crate) struct Tx<T, S> { inner: Arc<Chan<T, S>>, } @@ -25,7 +25,7 @@ impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> { } } -/// Channel receiver +/// Channel receiver. pub(crate) struct Rx<T, S: Semaphore> { inner: Arc<Chan<T, S>>, } @@ -47,7 +47,7 @@ pub(crate) trait Semaphore { } struct Chan<T, S> { - /// Notifies all tasks listening for the receiver being dropped + /// Notifies all tasks listening for the receiver being dropped. notify_rx_closed: Notify, /// Handle to the push half of the lock-free list. diff --git a/src/sync/mpsc/error.rs b/src/sync/mpsc/error.rs index 48ca379..b7b9cf7 100644 --- a/src/sync/mpsc/error.rs +++ b/src/sync/mpsc/error.rs @@ -1,4 +1,4 @@ -//! Channel error types +//! Channel error types. use std::error::Error; use std::fmt; diff --git a/src/sync/mpsc/list.rs b/src/sync/mpsc/list.rs index 53c34d2..e4eeb45 100644 --- a/src/sync/mpsc/list.rs +++ b/src/sync/mpsc/list.rs @@ -8,7 +8,7 @@ use std::fmt; use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; -/// List queue transmit handle +/// List queue transmit handle. pub(crate) struct Tx<T> { /// Tail in the `Block` mpmc list. block_tail: AtomicPtr<Block<T>>, @@ -79,7 +79,7 @@ impl<T> Tx<T> { } } - /// Closes the send half of the list + /// Closes the send half of the list. /// /// Similar process as pushing a value, but instead of writing the value & /// setting the ready flag, the TX_CLOSED flag is set on the block. diff --git a/src/sync/mpsc/unbounded.rs b/src/sync/mpsc/unbounded.rs index 8961930..b133f9f 100644 --- a/src/sync/mpsc/unbounded.rs +++ b/src/sync/mpsc/unbounded.rs @@ -129,7 +129,7 @@ impl<T> UnboundedReceiver<T> { poll_fn(|cx| self.poll_recv(cx)).await } - /// Try to receive the next value for this receiver. + /// Tries to receive the next value for this receiver. /// /// This method returns the [`Empty`] error if the channel is currently /// empty, but there are still outstanding [senders] or [permits]. diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 6acd28b..4d9f988 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -301,6 +301,40 @@ impl<T: ?Sized> Mutex<T> { MutexGuard { lock: self } } + /// Blocking lock this mutex. When the lock has been acquired, function returns a + /// [`MutexGuard`]. + /// + /// This method is intended for use cases where you + /// need to use this mutex in asynchronous code as well as in synchronous code. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use tokio::sync::Mutex; + /// + /// #[tokio::main] + /// async fn main() { + /// let mutex = Arc::new(Mutex::new(1)); + /// + /// let mutex1 = Arc::clone(&mutex); + /// let sync_code = tokio::task::spawn_blocking(move || { + /// let mut n = mutex1.blocking_lock(); + /// *n = 2; + /// }); + /// + /// sync_code.await.unwrap(); + /// + /// let n = mutex.lock().await; + /// assert_eq!(*n, 2); + /// } + /// + /// ``` + #[cfg(feature = "sync")] + pub fn blocking_lock(&self) -> MutexGuard<'_, T> { + crate::future::block_on(self.lock()) + } + /// Locks this mutex, causing the current task to yield until the lock has /// been acquired. When the lock has been acquired, this returns an /// [`OwnedMutexGuard`]. @@ -462,14 +496,14 @@ where } } -impl<T> std::fmt::Debug for Mutex<T> +impl<T: ?Sized> std::fmt::Debug for Mutex<T> where T: std::fmt::Debug, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut d = f.debug_struct("Mutex"); match self.try_lock() { - Ok(inner) => d.field("data", &*inner), + Ok(inner) => d.field("data", &&*inner), Err(_) => d.field("data", &format_args!("<locked>")), }; d.finish() diff --git a/src/sync/notify.rs b/src/sync/notify.rs index 74b97cc..c93ce3b 100644 --- a/src/sync/notify.rs +++ b/src/sync/notify.rs @@ -20,7 +20,7 @@ use std::task::{Context, Poll, Waker}; type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; -/// Notify a single task to wake up. +/// Notifies a single task to wake up. /// /// `Notify` provides a basic mechanism to notify a single task of an event. /// `Notify` itself does not carry any data. Instead, it is to be used to signal @@ -57,13 +57,16 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; /// let notify = Arc::new(Notify::new()); /// let notify2 = notify.clone(); /// -/// tokio::spawn(async move { +/// let handle = tokio::spawn(async move { /// notify2.notified().await; /// println!("received notification"); /// }); /// /// println!("sending notification"); /// notify.notify_one(); +/// +/// // Wait for task to receive notification. +/// handle.await.unwrap(); /// } /// ``` /// @@ -128,10 +131,10 @@ enum NotificationType { #[derive(Debug)] struct Waiter { - /// Intrusive linked-list pointers + /// Intrusive linked-list pointers. pointers: linked_list::Pointers<Waiter>, - /// Waiting task's waker + /// Waiting task's waker. waker: Option<Waker>, /// `true` if the notification has been assigned to this waiter. @@ -168,13 +171,13 @@ const NOTIFY_WAITERS_SHIFT: usize = 2; const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1; const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK; -/// Initial "idle" state +/// Initial "idle" state. const EMPTY: usize = 0; /// One or more threads are currently waiting to be notified. const WAITING: usize = 1; -/// Pending notification +/// Pending notification. const NOTIFIED: usize = 2; fn set_state(data: usize, state: usize) -> usize { @@ -289,7 +292,7 @@ impl Notify { } } - /// Notifies a waiting task + /// Notifies a waiting task. /// /// If a task is currently waiting, that task is notified. Otherwise, a /// permit is stored in this `Notify` value and the **next** call to @@ -359,7 +362,7 @@ impl Notify { } } - /// Notifies all waiting tasks + /// Notifies all waiting tasks. /// /// If a task is currently waiting, that task is notified. Unlike with /// `notify_one()`, no permit is stored to be used by the next call to @@ -551,6 +554,10 @@ impl Future for Notified<'_> { return Poll::Ready(()); } + // Clone the waker before locking, a waker clone can be + // triggering arbitrary code. + let waker = cx.waker().clone(); + // Acquire the lock and attempt to transition to the waiting // state. let mut waiters = notify.waiters.lock(); @@ -612,7 +619,7 @@ impl Future for Notified<'_> { // Safety: called while locked. unsafe { - (*waiter.get()).waker = Some(cx.waker().clone()); + (*waiter.get()).waker = Some(waker); } // Insert the waiter into the linked list diff --git a/src/sync/once_cell.rs b/src/sync/once_cell.rs index 91705a5..d31a40e 100644 --- a/src/sync/once_cell.rs +++ b/src/sync/once_cell.rs @@ -245,7 +245,7 @@ impl<T> OnceCell<T> { } } - /// Set the value of the `OnceCell` to the given value if the `OnceCell` is + /// Sets the value of the `OnceCell` to the given value if the `OnceCell` is /// empty. /// /// If the `OnceCell` already has a value, this call will fail with an @@ -283,7 +283,7 @@ impl<T> OnceCell<T> { } } - /// Get the value currently in the `OnceCell`, or initialize it with the + /// Gets the value currently in the `OnceCell`, or initialize it with the /// given asynchronous operation. /// /// If some other task is currently working on initializing the `OnceCell`, @@ -331,7 +331,7 @@ impl<T> OnceCell<T> { } } - /// Get the value currently in the `OnceCell`, or initialize it with the + /// Gets the value currently in the `OnceCell`, or initialize it with the /// given asynchronous operation. /// /// If some other task is currently working on initializing the `OnceCell`, @@ -382,7 +382,7 @@ impl<T> OnceCell<T> { } } - /// Take the value from the cell, destroying the cell in the process. + /// Takes the value from the cell, destroying the cell in the process. /// Returns `None` if the cell is empty. pub fn into_inner(mut self) -> Option<T> { if self.initialized_mut() { diff --git a/src/sync/oneshot.rs b/src/sync/oneshot.rs index 0df6037..4fb22ec 100644 --- a/src/sync/oneshot.rs +++ b/src/sync/oneshot.rs @@ -51,6 +51,70 @@ //! } //! } //! ``` +//! +//! To use a oneshot channel in a `tokio::select!` loop, add `&mut` in front of +//! the channel. +//! +//! ``` +//! use tokio::sync::oneshot; +//! use tokio::time::{interval, sleep, Duration}; +//! +//! #[tokio::main] +//! # async fn _doc() {} +//! # #[tokio::main(flavor = "current_thread", start_paused = true)] +//! async fn main() { +//! let (send, mut recv) = oneshot::channel(); +//! let mut interval = interval(Duration::from_millis(100)); +//! +//! # let handle = +//! tokio::spawn(async move { +//! sleep(Duration::from_secs(1)).await; +//! send.send("shut down").unwrap(); +//! }); +//! +//! loop { +//! tokio::select! { +//! _ = interval.tick() => println!("Another 100ms"), +//! msg = &mut recv => { +//! println!("Got message: {}", msg.unwrap()); +//! break; +//! } +//! } +//! } +//! # handle.await.unwrap(); +//! } +//! ``` +//! +//! To use a `Sender` from a destructor, put it in an [`Option`] and call +//! [`Option::take`]. +//! +//! ``` +//! use tokio::sync::oneshot; +//! +//! struct SendOnDrop { +//! sender: Option<oneshot::Sender<&'static str>>, +//! } +//! impl Drop for SendOnDrop { +//! fn drop(&mut self) { +//! if let Some(sender) = self.sender.take() { +//! // Using `let _ =` to ignore send errors. +//! let _ = sender.send("I got dropped!"); +//! } +//! } +//! } +//! +//! #[tokio::main] +//! # async fn _doc() {} +//! # #[tokio::main(flavor = "current_thread")] +//! async fn main() { +//! let (send, recv) = oneshot::channel(); +//! +//! let send_on_drop = SendOnDrop { sender: Some(send) }; +//! drop(send_on_drop); +//! +//! assert_eq!(recv.await, Ok("I got dropped!")); +//! } +//! ``` use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; @@ -68,16 +132,98 @@ use std::task::{Context, Poll, Waker}; /// /// A pair of both a [`Sender`] and a [`Receiver`] are created by the /// [`channel`](fn@channel) function. +/// +/// # Examples +/// +/// ``` +/// use tokio::sync::oneshot; +/// +/// #[tokio::main] +/// async fn main() { +/// let (tx, rx) = oneshot::channel(); +/// +/// tokio::spawn(async move { +/// if let Err(_) = tx.send(3) { +/// println!("the receiver dropped"); +/// } +/// }); +/// +/// match rx.await { +/// Ok(v) => println!("got = {:?}", v), +/// Err(_) => println!("the sender dropped"), +/// } +/// } +/// ``` +/// +/// If the sender is dropped without sending, the receiver will fail with +/// [`error::RecvError`]: +/// +/// ``` +/// use tokio::sync::oneshot; +/// +/// #[tokio::main] +/// async fn main() { +/// let (tx, rx) = oneshot::channel::<u32>(); +/// +/// tokio::spawn(async move { +/// drop(tx); +/// }); +/// +/// match rx.await { +/// Ok(_) => panic!("This doesn't happen"), +/// Err(_) => println!("the sender dropped"), +/// } +/// } +/// ``` +/// +/// To use a `Sender` from a destructor, put it in an [`Option`] and call +/// [`Option::take`]. +/// +/// ``` +/// use tokio::sync::oneshot; +/// +/// struct SendOnDrop { +/// sender: Option<oneshot::Sender<&'static str>>, +/// } +/// impl Drop for SendOnDrop { +/// fn drop(&mut self) { +/// if let Some(sender) = self.sender.take() { +/// // Using `let _ =` to ignore send errors. +/// let _ = sender.send("I got dropped!"); +/// } +/// } +/// } +/// +/// #[tokio::main] +/// # async fn _doc() {} +/// # #[tokio::main(flavor = "current_thread")] +/// async fn main() { +/// let (send, recv) = oneshot::channel(); +/// +/// let send_on_drop = SendOnDrop { sender: Some(send) }; +/// drop(send_on_drop); +/// +/// assert_eq!(recv.await, Ok("I got dropped!")); +/// } +/// ``` +/// +/// [`Option`]: std::option::Option +/// [`Option::take`]: std::option::Option::take #[derive(Debug)] pub struct Sender<T> { inner: Option<Arc<Inner<T>>>, } -/// Receive a value from the associated [`Sender`]. +/// Receives a value from the associated [`Sender`]. /// /// A pair of both a [`Sender`] and a [`Receiver`] are created by the /// [`channel`](fn@channel) function. /// +/// This channel has no `recv` method because the receiver itself implements the +/// [`Future`] trait. To receive a value, `.await` the `Receiver` object directly. +/// +/// [`Future`]: trait@std::future::Future +/// /// # Examples /// /// ``` @@ -120,13 +266,46 @@ pub struct Sender<T> { /// } /// } /// ``` +/// +/// To use a `Receiver` in a `tokio::select!` loop, add `&mut` in front of the +/// channel. +/// +/// ``` +/// use tokio::sync::oneshot; +/// use tokio::time::{interval, sleep, Duration}; +/// +/// #[tokio::main] +/// # async fn _doc() {} +/// # #[tokio::main(flavor = "current_thread", start_paused = true)] +/// async fn main() { +/// let (send, mut recv) = oneshot::channel(); +/// let mut interval = interval(Duration::from_millis(100)); +/// +/// # let handle = +/// tokio::spawn(async move { +/// sleep(Duration::from_secs(1)).await; +/// send.send("shut down").unwrap(); +/// }); +/// +/// loop { +/// tokio::select! { +/// _ = interval.tick() => println!("Another 100ms"), +/// msg = &mut recv => { +/// println!("Got message: {}", msg.unwrap()); +/// break; +/// } +/// } +/// } +/// # handle.await.unwrap(); +/// } +/// ``` #[derive(Debug)] pub struct Receiver<T> { inner: Option<Arc<Inner<T>>>, } pub mod error { - //! Oneshot error types + //! Oneshot error types. use std::fmt; @@ -171,7 +350,7 @@ pub mod error { use self::error::*; struct Inner<T> { - /// Manages the state of the inner cell + /// Manages the state of the inner cell. state: AtomicUsize, /// The value. This is set by `Sender` and read by `Receiver`. The state of @@ -179,9 +358,19 @@ struct Inner<T> { value: UnsafeCell<Option<T>>, /// The task to notify when the receiver drops without consuming the value. + /// + /// ## Safety + /// + /// The `TX_TASK_SET` bit in the `state` field is set if this field is + /// initialized. If that bit is unset, this field may be uninitialized. tx_task: Task, /// The task to notify when the value is sent. + /// + /// ## Safety + /// + /// The `RX_TASK_SET` bit in the `state` field is set if this field is + /// initialized. If that bit is unset, this field may be uninitialized. rx_task: Task, } @@ -220,7 +409,7 @@ impl Task { #[derive(Clone, Copy)] struct State(usize); -/// Create a new one-shot channel for sending single values across asynchronous +/// Creates a new one-shot channel for sending single values across asynchronous /// tasks. /// /// The function returns separate "send" and "receive" handles. The `Sender` @@ -311,11 +500,24 @@ impl<T> Sender<T> { let inner = self.inner.take().unwrap(); inner.value.with_mut(|ptr| unsafe { + // SAFETY: The receiver will not access the `UnsafeCell` unless the + // channel has been marked as "complete" (the `VALUE_SENT` state bit + // is set). + // That bit is only set by the sender later on in this method, and + // calling this method consumes `self`. Therefore, if it was possible to + // call this method, we know that the `VALUE_SENT` bit is unset, and + // the receiver is not currently accessing the `UnsafeCell`. *ptr = Some(t); }); if !inner.complete() { unsafe { + // SAFETY: The receiver will not access the `UnsafeCell` unless + // the channel has been marked as "complete". Calling + // `complete()` will return true if this bit is set, and false + // if it is not set. Thus, if `complete()` returned false, it is + // safe for us to access the value, because we know that the + // receiver will not. return Err(inner.consume_value().unwrap()); } } @@ -430,7 +632,7 @@ impl<T> Sender<T> { state.is_closed() } - /// Check whether the oneshot channel has been closed, and if not, schedules the + /// Checks whether the oneshot channel has been closed, and if not, schedules the /// `Waker` in the provided `Context` to receive a notification when the channel is /// closed. /// @@ -661,6 +863,11 @@ impl<T> Receiver<T> { let state = State::load(&inner.state, Acquire); if state.is_complete() { + // SAFETY: If `state.is_complete()` returns true, then the + // `VALUE_SENT` bit has been set and the sender side of the + // channel will no longer attempt to access the inner + // `UnsafeCell`. Therefore, it is now safe for us to access the + // cell. match unsafe { inner.consume_value() } { Some(value) => Ok(value), None => Err(TryRecvError::Closed), @@ -751,6 +958,11 @@ impl<T> Inner<T> { State::set_rx_task(&self.state); coop.made_progress(); + // SAFETY: If `state.is_complete()` returns true, then the + // `VALUE_SENT` bit has been set and the sender side of the + // channel will no longer attempt to access the inner + // `UnsafeCell`. Therefore, it is now safe for us to access the + // cell. return match unsafe { self.consume_value() } { Some(value) => Ready(Ok(value)), None => Ready(Err(RecvError(()))), @@ -797,6 +1009,14 @@ impl<T> Inner<T> { } /// Consumes the value. This function does not check `state`. + /// + /// # Safety + /// + /// Calling this method concurrently on multiple threads will result in a + /// data race. The `VALUE_SENT` state bit is used to ensure that only the + /// sender *or* the receiver will call this method at a given point in time. + /// If `VALUE_SENT` is not set, then only the sender may call this method; + /// if it is set, then only the receiver may call this method. unsafe fn consume_value(&self) -> Option<T> { self.value.with_mut(|ptr| (*ptr).take()) } @@ -837,9 +1057,28 @@ impl<T: fmt::Debug> fmt::Debug for Inner<T> { } } +/// Indicates that a waker for the receiving task has been set. +/// +/// # Safety +/// +/// If this bit is not set, the `rx_task` field may be uninitialized. const RX_TASK_SET: usize = 0b00001; +/// Indicates that a value has been stored in the channel's inner `UnsafeCell`. +/// +/// # Safety +/// +/// This bit controls which side of the channel is permitted to access the +/// `UnsafeCell`. If it is set, the `UnsafeCell` may ONLY be accessed by the +/// receiver. If this bit is NOT set, the `UnsafeCell` may ONLY be accessed by +/// the sender. const VALUE_SENT: usize = 0b00010; const CLOSED: usize = 0b00100; + +/// Indicates that a waker for the sending task has been set. +/// +/// # Safety +/// +/// If this bit is not set, the `tx_task` field may be uninitialized. const TX_TASK_SET: usize = 0b01000; impl State { @@ -852,11 +1091,38 @@ impl State { } fn set_complete(cell: &AtomicUsize) -> State { - // TODO: This could be `Release`, followed by an `Acquire` fence *if* - // the `RX_TASK_SET` flag is set. However, `loom` does not support - // fences yet. - let val = cell.fetch_or(VALUE_SENT, AcqRel); - State(val) + // This method is a compare-and-swap loop rather than a fetch-or like + // other `set_$WHATEVER` methods on `State`. This is because we must + // check if the state has been closed before setting the `VALUE_SENT` + // bit. + // + // We don't want to set both the `VALUE_SENT` bit if the `CLOSED` + // bit is already set, because `VALUE_SENT` will tell the receiver that + // it's okay to access the inner `UnsafeCell`. Immediately after calling + // `set_complete`, if the channel was closed, the sender will _also_ + // access the `UnsafeCell` to take the value back out, so if a + // `poll_recv` or `try_recv` call is occurring concurrently, both + // threads may try to access the `UnsafeCell` if we were to set the + // `VALUE_SENT` bit on a closed channel. + let mut state = cell.load(Ordering::Relaxed); + loop { + if State(state).is_closed() { + break; + } + // TODO: This could be `Release`, followed by an `Acquire` fence *if* + // the `RX_TASK_SET` flag is set. However, `loom` does not support + // fences yet. + match cell.compare_exchange_weak( + state, + state | VALUE_SENT, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => state = actual, + } + } + State(state) } fn is_rx_task_set(self) -> bool { diff --git a/src/sync/rwlock/owned_read_guard.rs b/src/sync/rwlock/owned_read_guard.rs index b7f3926..1881295 100644 --- a/src/sync/rwlock/owned_read_guard.rs +++ b/src/sync/rwlock/owned_read_guard.rs @@ -22,7 +22,7 @@ pub struct OwnedRwLockReadGuard<T: ?Sized, U: ?Sized = T> { } impl<T: ?Sized, U: ?Sized> OwnedRwLockReadGuard<T, U> { - /// Make a new `OwnedRwLockReadGuard` for a component of the locked data. + /// Makes a new `OwnedRwLockReadGuard` for a component of the locked data. /// This operation cannot fail as the `OwnedRwLockReadGuard` passed in /// already locked the data. /// diff --git a/src/sync/rwlock/owned_write_guard.rs b/src/sync/rwlock/owned_write_guard.rs index 91b6595..0a78d28 100644 --- a/src/sync/rwlock/owned_write_guard.rs +++ b/src/sync/rwlock/owned_write_guard.rs @@ -24,7 +24,7 @@ pub struct OwnedRwLockWriteGuard<T: ?Sized> { } impl<T: ?Sized> OwnedRwLockWriteGuard<T> { - /// Make a new [`OwnedRwLockMappedWriteGuard`] for a component of the locked + /// Makes a new [`OwnedRwLockMappedWriteGuard`] for a component of the locked /// data. /// /// This operation cannot fail as the `OwnedRwLockWriteGuard` passed in diff --git a/src/sync/rwlock/owned_write_guard_mapped.rs b/src/sync/rwlock/owned_write_guard_mapped.rs index 6453236..d88ee01 100644 --- a/src/sync/rwlock/owned_write_guard_mapped.rs +++ b/src/sync/rwlock/owned_write_guard_mapped.rs @@ -23,7 +23,7 @@ pub struct OwnedRwLockMappedWriteGuard<T: ?Sized, U: ?Sized = T> { } impl<T: ?Sized, U: ?Sized> OwnedRwLockMappedWriteGuard<T, U> { - /// Make a new `OwnedRwLockMappedWriteGuard` for a component of the locked + /// Makes a new `OwnedRwLockMappedWriteGuard` for a component of the locked /// data. /// /// This operation cannot fail as the `OwnedRwLockMappedWriteGuard` passed diff --git a/src/sync/rwlock/read_guard.rs b/src/sync/rwlock/read_guard.rs index 38eec77..090b297 100644 --- a/src/sync/rwlock/read_guard.rs +++ b/src/sync/rwlock/read_guard.rs @@ -19,7 +19,7 @@ pub struct RwLockReadGuard<'a, T: ?Sized> { } impl<'a, T: ?Sized> RwLockReadGuard<'a, T> { - /// Make a new `RwLockReadGuard` for a component of the locked data. + /// Makes a new `RwLockReadGuard` for a component of the locked data. /// /// This operation cannot fail as the `RwLockReadGuard` passed in already /// locked the data. diff --git a/src/sync/rwlock/write_guard.rs b/src/sync/rwlock/write_guard.rs index 865a121..8c80ee7 100644 --- a/src/sync/rwlock/write_guard.rs +++ b/src/sync/rwlock/write_guard.rs @@ -22,7 +22,7 @@ pub struct RwLockWriteGuard<'a, T: ?Sized> { } impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { - /// Make a new [`RwLockMappedWriteGuard`] for a component of the locked data. + /// Makes a new [`RwLockMappedWriteGuard`] for a component of the locked data. /// /// This operation cannot fail as the `RwLockWriteGuard` passed in already /// locked the data. diff --git a/src/sync/rwlock/write_guard_mapped.rs b/src/sync/rwlock/write_guard_mapped.rs index 9c5b1e7..3cf69de 100644 --- a/src/sync/rwlock/write_guard_mapped.rs +++ b/src/sync/rwlock/write_guard_mapped.rs @@ -21,7 +21,7 @@ pub struct RwLockMappedWriteGuard<'a, T: ?Sized> { } impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> { - /// Make a new `RwLockMappedWriteGuard` for a component of the locked data. + /// Makes a new `RwLockMappedWriteGuard` for a component of the locked data. /// /// This operation cannot fail as the `RwLockMappedWriteGuard` passed in already /// locked the data. diff --git a/src/sync/task/atomic_waker.rs b/src/sync/task/atomic_waker.rs index 8616007..e1330fb 100644 --- a/src/sync/task/atomic_waker.rs +++ b/src/sync/task/atomic_waker.rs @@ -4,6 +4,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{self, AtomicUsize}; use std::fmt; +use std::panic::{resume_unwind, AssertUnwindSafe, RefUnwindSafe, UnwindSafe}; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::task::Waker; @@ -27,6 +28,9 @@ pub(crate) struct AtomicWaker { waker: UnsafeCell<Option<Waker>>, } +impl RefUnwindSafe for AtomicWaker {} +impl UnwindSafe for AtomicWaker {} + // `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell // stores a `Waker` value produced by calls to `register` and many threads can // race to take the waker by calling `wake`. @@ -84,7 +88,7 @@ pub(crate) struct AtomicWaker { // back to `WAITING`. This transition must succeed as, at this point, the state // cannot be transitioned by another thread. // -// If the thread is unable to obtain the lock, the `WAKING` bit is still. +// If the thread is unable to obtain the lock, the `WAKING` bit is still set. // This is because it has either been set by the current thread but the previous // value included the `REGISTERING` bit **or** a concurrent thread is in the // `WAKING` critical section. Either way, no action must be taken. @@ -123,7 +127,7 @@ pub(crate) struct AtomicWaker { // Thread A still holds the `wake` lock, the call to `register` will result // in the task waking itself and get scheduled again. -/// Idle state +/// Idle state. const WAITING: usize = 0; /// A new waker value is being registered with the `AtomicWaker` cell. @@ -171,6 +175,10 @@ impl AtomicWaker { where W: WakerRef, { + fn catch_unwind<F: FnOnce() -> R, R>(f: F) -> std::thread::Result<R> { + std::panic::catch_unwind(AssertUnwindSafe(f)) + } + match self .state .compare_exchange(WAITING, REGISTERING, Acquire, Acquire) @@ -178,8 +186,24 @@ impl AtomicWaker { { WAITING => { unsafe { - // Locked acquired, update the waker cell - self.waker.with_mut(|t| *t = Some(waker.into_waker())); + // If `into_waker` panics (because it's code outside of + // AtomicWaker) we need to prime a guard that is called on + // unwind to restore the waker to a WAITING state. Otherwise + // any future calls to register will incorrectly be stuck + // believing it's being updated by someone else. + let new_waker_or_panic = catch_unwind(move || waker.into_waker()); + + // Set the field to contain the new waker, or if + // `into_waker` panicked, leave the old value. + let mut maybe_panic = None; + let mut old_waker = None; + match new_waker_or_panic { + Ok(new_waker) => { + old_waker = self.waker.with_mut(|t| (*t).take()); + self.waker.with_mut(|t| *t = Some(new_waker)); + } + Err(panic) => maybe_panic = Some(panic), + } // Release the lock. If the state transitioned to include // the `WAKING` bit, this means that a wake has been @@ -193,33 +217,67 @@ impl AtomicWaker { .compare_exchange(REGISTERING, WAITING, AcqRel, Acquire); match res { - Ok(_) => {} + Ok(_) => { + // We don't want to give the caller the panic if it + // was someone else who put in that waker. + let _ = catch_unwind(move || { + drop(old_waker); + }); + } Err(actual) => { // This branch can only be reached if a // concurrent thread called `wake`. In this // case, `actual` **must** be `REGISTERING | - // `WAKING`. + // WAKING`. debug_assert_eq!(actual, REGISTERING | WAKING); // Take the waker to wake once the atomic operation has // completed. - let waker = self.waker.with_mut(|t| (*t).take()).unwrap(); + let mut waker = self.waker.with_mut(|t| (*t).take()); // Just swap, because no one could change state // while state == `Registering | `Waking` self.state.swap(WAITING, AcqRel); - // The atomic swap was complete, now - // wake the waker and return. - waker.wake(); + // If `into_waker` panicked, then the waker in the + // waker slot is actually the old waker. + if maybe_panic.is_some() { + old_waker = waker.take(); + } + + // We don't want to give the caller the panic if it + // was someone else who put in that waker. + if let Some(old_waker) = old_waker { + let _ = catch_unwind(move || { + old_waker.wake(); + }); + } + + // The atomic swap was complete, now wake the waker + // and return. + // + // If this panics, we end up in a consumed state and + // return the panic to the caller. + if let Some(waker) = waker { + debug_assert!(maybe_panic.is_none()); + waker.wake(); + } } } + + if let Some(panic) = maybe_panic { + // If `into_waker` panicked, return the panic to the caller. + resume_unwind(panic); + } } } WAKING => { // Currently in the process of waking the task, i.e., // `wake` is currently being called on the old waker. // So, we call wake on the new waker. + // + // If this panics, someone else is responsible for restoring the + // state of the waker. waker.wake(); // This is equivalent to a spin lock, so use a spin hint. @@ -245,6 +303,8 @@ impl AtomicWaker { /// If `register` has not been called yet, then this does nothing. pub(crate) fn wake(&self) { if let Some(waker) = self.take_waker() { + // If wake panics, we've consumed the waker which is a legitimate + // outcome. waker.wake(); } } diff --git a/src/sync/tests/atomic_waker.rs b/src/sync/tests/atomic_waker.rs index c832d62..b167a5d 100644 --- a/src/sync/tests/atomic_waker.rs +++ b/src/sync/tests/atomic_waker.rs @@ -32,3 +32,42 @@ fn wake_without_register() { assert!(!waker.is_woken()); } + +#[test] +fn atomic_waker_panic_safe() { + use std::panic; + use std::ptr; + use std::task::{RawWaker, RawWakerVTable, Waker}; + + static PANICKING_VTABLE: RawWakerVTable = RawWakerVTable::new( + |_| panic!("clone"), + |_| unimplemented!("wake"), + |_| unimplemented!("wake_by_ref"), + |_| (), + ); + + static NONPANICKING_VTABLE: RawWakerVTable = RawWakerVTable::new( + |_| RawWaker::new(ptr::null(), &NONPANICKING_VTABLE), + |_| unimplemented!("wake"), + |_| unimplemented!("wake_by_ref"), + |_| (), + ); + + let panicking = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &PANICKING_VTABLE)) }; + let nonpanicking = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NONPANICKING_VTABLE)) }; + + let atomic_waker = AtomicWaker::new(); + + let panicking = panic::AssertUnwindSafe(&panicking); + + let result = panic::catch_unwind(|| { + let panic::AssertUnwindSafe(panicking) = panicking; + atomic_waker.register_by_ref(panicking); + }); + + assert!(result.is_err()); + assert!(atomic_waker.take_waker().is_none()); + + atomic_waker.register_by_ref(&nonpanicking); + assert!(atomic_waker.take_waker().is_some()); +} diff --git a/src/sync/tests/loom_atomic_waker.rs b/src/sync/tests/loom_atomic_waker.rs index c148bcb..f8bae65 100644 --- a/src/sync/tests/loom_atomic_waker.rs +++ b/src/sync/tests/loom_atomic_waker.rs @@ -43,3 +43,58 @@ fn basic_notification() { })); }); } + +#[test] +fn test_panicky_waker() { + use std::panic; + use std::ptr; + use std::task::{RawWaker, RawWakerVTable, Waker}; + + static PANICKING_VTABLE: RawWakerVTable = + RawWakerVTable::new(|_| panic!("clone"), |_| (), |_| (), |_| ()); + + let panicking = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &PANICKING_VTABLE)) }; + + // If you're working with this test (and I sure hope you never have to!), + // uncomment the following section because there will be a lot of panics + // which would otherwise log. + // + // We can't however leaved it uncommented, because it's global. + // panic::set_hook(Box::new(|_| ())); + + const NUM_NOTIFY: usize = 2; + + loom::model(move || { + let chan = Arc::new(Chan { + num: AtomicUsize::new(0), + task: AtomicWaker::new(), + }); + + for _ in 0..NUM_NOTIFY { + let chan = chan.clone(); + + thread::spawn(move || { + chan.num.fetch_add(1, Relaxed); + chan.task.wake(); + }); + } + + // Note: this panic should have no effect on the overall state of the + // waker and it should proceed as normal. + // + // A thread above might race to flag a wakeup, and a WAKING state will + // be preserved if this expected panic races with that so the below + // procedure should be allowed to continue uninterrupted. + let _ = panic::catch_unwind(|| chan.task.register_by_ref(&panicking)); + + block_on(poll_fn(move |cx| { + chan.task.register_by_ref(cx.waker()); + + if NUM_NOTIFY == chan.num.load(Relaxed) { + return Ready(()); + } + + Pending + })); + }); +} diff --git a/src/sync/tests/loom_oneshot.rs b/src/sync/tests/loom_oneshot.rs index 9729cfb..c5f7972 100644 --- a/src/sync/tests/loom_oneshot.rs +++ b/src/sync/tests/loom_oneshot.rs @@ -55,6 +55,35 @@ fn changing_rx_task() { }); } +#[test] +fn try_recv_close() { + // reproduces https://github.com/tokio-rs/tokio/issues/4225 + loom::model(|| { + let (tx, mut rx) = oneshot::channel(); + thread::spawn(move || { + let _ = tx.send(()); + }); + + rx.close(); + let _ = rx.try_recv(); + }) +} + +#[test] +fn recv_closed() { + // reproduces https://github.com/tokio-rs/tokio/issues/4225 + loom::model(|| { + let (tx, mut rx) = oneshot::channel(); + + thread::spawn(move || { + let _ = tx.send(1); + }); + + rx.close(); + let _ = block_on(rx); + }); +} + // TODO: Move this into `oneshot` proper. use std::future::Future; diff --git a/src/sync/tests/mod.rs b/src/sync/tests/mod.rs index c5d5601..ee76418 100644 --- a/src/sync/tests/mod.rs +++ b/src/sync/tests/mod.rs @@ -1,5 +1,6 @@ cfg_not_loom! { mod atomic_waker; + mod notify; mod semaphore_batch; } diff --git a/src/sync/tests/notify.rs b/src/sync/tests/notify.rs new file mode 100644 index 0000000..8c9a573 --- /dev/null +++ b/src/sync/tests/notify.rs @@ -0,0 +1,44 @@ +use crate::sync::Notify; +use std::future::Future; +use std::mem::ManuallyDrop; +use std::sync::Arc; +use std::task::{Context, RawWaker, RawWakerVTable, Waker}; + +#[test] +fn notify_clones_waker_before_lock() { + const VTABLE: &RawWakerVTable = &RawWakerVTable::new(clone_w, wake, wake_by_ref, drop_w); + + unsafe fn clone_w(data: *const ()) -> RawWaker { + let arc = ManuallyDrop::new(Arc::<Notify>::from_raw(data as *const Notify)); + // Or some other arbitrary code that shouldn't be executed while the + // Notify wait list is locked. + arc.notify_one(); + let _arc_clone: ManuallyDrop<_> = arc.clone(); + RawWaker::new(data, VTABLE) + } + + unsafe fn drop_w(data: *const ()) { + let _ = Arc::<Notify>::from_raw(data as *const Notify); + } + + unsafe fn wake(_data: *const ()) { + unreachable!() + } + + unsafe fn wake_by_ref(_data: *const ()) { + unreachable!() + } + + let notify = Arc::new(Notify::new()); + let notify2 = notify.clone(); + + let waker = + unsafe { Waker::from_raw(RawWaker::new(Arc::into_raw(notify2) as *const _, VTABLE)) }; + let mut cx = Context::from_waker(&waker); + + let future = notify.notified(); + pin!(future); + + // The result doesn't matter, we're just testing that we don't deadlock. + let _ = future.poll(&mut cx); +} diff --git a/src/sync/watch.rs b/src/sync/watch.rs index b5da218..7e45c11 100644 --- a/src/sync/watch.rs +++ b/src/sync/watch.rs @@ -58,6 +58,7 @@ use crate::sync::notify::Notify; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::Ordering::Relaxed; use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; +use std::mem; use std::ops; /// Receives values from the associated [`Sender`](struct@Sender). @@ -85,7 +86,7 @@ pub struct Sender<T> { shared: Arc<Shared<T>>, } -/// Returns a reference to the inner value +/// Returns a reference to the inner value. /// /// Outstanding borrows hold a read lock on the inner value. This means that /// long lived borrows could cause the produce half to block. It is recommended @@ -97,27 +98,27 @@ pub struct Ref<'a, T> { #[derive(Debug)] struct Shared<T> { - /// The most recent value + /// The most recent value. value: RwLock<T>, - /// The current version + /// The current version. /// /// The lowest bit represents a "closed" state. The rest of the bits /// represent the current version. state: AtomicState, - /// Tracks the number of `Receiver` instances + /// Tracks the number of `Receiver` instances. ref_count_rx: AtomicUsize, /// Notifies waiting receivers that the value changed. notify_rx: Notify, - /// Notifies any task listening for `Receiver` dropped events + /// Notifies any task listening for `Receiver` dropped events. notify_tx: Notify, } pub mod error { - //! Watch error types + //! Watch error types. use std::fmt; @@ -317,7 +318,7 @@ impl<T> Receiver<T> { Ref { inner } } - /// Wait for a change notification, then mark the newest value as seen. + /// Waits for a change notification, then marks the newest value as seen. /// /// If the newest value in the channel has not yet been marked seen when /// this method is called, the method marks that value seen and returns @@ -432,10 +433,31 @@ impl<T> Sender<T> { return Err(error::SendError(value)); } - { + self.send_replace(value); + Ok(()) + } + + /// Sends a new value via the channel, notifying all receivers and returning + /// the previous value in the channel. + /// + /// This can be useful for reusing the buffers inside a watched value. + /// Additionally, this method permits sending values even when there are no + /// receivers. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// let (tx, _rx) = watch::channel(1); + /// assert_eq!(tx.send_replace(2), 1); + /// assert_eq!(tx.send_replace(3), 2); + /// ``` + pub fn send_replace(&self, value: T) -> T { + let old = { // Acquire the write lock and update the value. let mut lock = self.shared.value.write().unwrap(); - *lock = value; + let old = mem::replace(&mut *lock, value); self.shared.state.increment_version(); @@ -445,12 +467,14 @@ impl<T> Sender<T> { // that receivers are able to figure out the version number of the // value they are currently looking at. drop(lock); - } + + old + }; // Notify all watchers self.shared.notify_rx.notify_waiters(); - Ok(()) + old } /// Returns a reference to the most recently sent value @@ -595,7 +619,7 @@ impl<T> Sender<T> { Receiver::from_shared(version, shared) } - /// Returns the number of receivers that currently exist + /// Returns the number of receivers that currently exist. /// /// # Examples /// diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 806dbbd..825f25f 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -112,27 +112,82 @@ cfg_rt! { /// still spawn additional threads for blocking operations. The basic /// scheduler's single thread is only used for asynchronous code. /// + /// # Related APIs and patterns for bridging asynchronous and blocking code + /// + /// In simple cases, it is sufficient to have the closure accept input + /// parameters at creation time and return a single value (or struct/tuple, etc.). + /// + /// For more complex situations in which it is desirable to stream data to or from + /// the synchronous context, the [`mpsc channel`] has `blocking_send` and + /// `blocking_recv` methods for use in non-async code such as the thread created + /// by `spawn_blocking`. + /// + /// Another option is [`SyncIoBridge`] for cases where the synchronous context + /// is operating on byte streams. For example, you might use an asynchronous + /// HTTP client such as [hyper] to fetch data, but perform complex parsing + /// of the payload body using a library written for synchronous I/O. + /// + /// Finally, see also [Bridging with sync code][bridgesync] for discussions + /// around the opposite case of using Tokio as part of a larger synchronous + /// codebase. + /// /// [`Builder`]: struct@crate::runtime::Builder /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code /// [rayon]: https://docs.rs/rayon + /// [`mpsc channel`]: crate::sync::mpsc + /// [`SyncIoBridge`]: https://docs.rs/tokio-util/0.6/tokio_util/io/struct.SyncIoBridge.html + /// [hyper]: https://docs.rs/hyper /// [`thread::spawn`]: fn@std::thread::spawn /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout + /// [bridgesync]: https://tokio.rs/tokio/topics/bridging /// /// # Examples /// + /// Pass an input value and receive result of computation: + /// /// ``` /// use tokio::task; /// /// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{ + /// // Initial input + /// let mut v = "Hello, ".to_string(); /// let res = task::spawn_blocking(move || { - /// // do some compute-heavy work or call synchronous code - /// "done computing" + /// // Stand-in for compute-heavy work or using synchronous APIs + /// v.push_str("world"); + /// // Pass ownership of the value back to the asynchronous context + /// v /// }).await?; /// - /// assert_eq!(res, "done computing"); + /// // `res` is the value returned from the thread + /// assert_eq!(res.as_str(), "Hello, world"); /// # Ok(()) /// # } /// ``` + /// + /// Use a channel: + /// + /// ``` + /// use tokio::task; + /// use tokio::sync::mpsc; + /// + /// # async fn docs() { + /// let (tx, mut rx) = mpsc::channel(2); + /// let start = 5; + /// let worker = task::spawn_blocking(move || { + /// for x in 0..10 { + /// // Stand in for complex computation + /// tx.blocking_send(start + x).unwrap(); + /// } + /// }); + /// + /// let mut acc = 0; + /// while let Some(v) = rx.recv().await { + /// acc += v; + /// } + /// assert_eq!(acc, 95); + /// worker.await.unwrap(); + /// # } + /// ``` #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R> where diff --git a/src/task/builder.rs b/src/task/builder.rs index e46bdef..f991fc6 100644 --- a/src/task/builder.rs +++ b/src/task/builder.rs @@ -1,5 +1,4 @@ #![allow(unreachable_pub)] -use crate::util::error::CONTEXT_MISSING_ERROR; use crate::{runtime::context, task::JoinHandle}; use std::future::Future; @@ -98,8 +97,6 @@ impl<'a> Builder<'a> { Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, { - context::current() - .expect(CONTEXT_MISSING_ERROR) - .spawn_blocking_inner(function, self.name) + context::current().spawn_blocking_inner(function, self.name) } } diff --git a/src/task/local.rs b/src/task/local.rs index a28d793..4a5d313 100644 --- a/src/task/local.rs +++ b/src/task/local.rs @@ -211,10 +211,10 @@ cfg_rt! { /// [`task::spawn_local`]: fn@spawn_local /// [`mpsc`]: mod@crate::sync::mpsc pub struct LocalSet { - /// Current scheduler tick + /// Current scheduler tick. tick: Cell<u8>, - /// State available from thread-local + /// State available from thread-local. context: Context, /// This type should not be Send. @@ -222,7 +222,7 @@ cfg_rt! { } } -/// State available from the thread-local +/// State available from the thread-local. struct Context { /// Collection of all active tasks spawned onto this executor. owned: LocalOwnedTasks<Arc<Shared>>, @@ -236,10 +236,10 @@ struct Context { /// LocalSet state shared between threads. struct Shared { - /// Remote run queue sender + /// Remote run queue sender. queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>, - /// Wake the `LocalSet` task + /// Wake the `LocalSet` task. waker: AtomicWaker, } @@ -315,13 +315,13 @@ cfg_rt! { } } -/// Initial queue capacity +/// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; /// Max number of tasks to poll per tick. const MAX_TASKS_PER_TICK: usize = 61; -/// How often it check the remote queue first +/// How often it check the remote queue first. const REMOTE_FIRST_INTERVAL: u8 = 31; impl LocalSet { @@ -466,7 +466,7 @@ impl LocalSet { rt.block_on(self.run_until(future)) } - /// Run a future to completion on the local set, returning its output. + /// Runs a future to completion on the local set, returning its output. /// /// This returns a future that runs the given future with a local set, /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures. @@ -505,7 +505,7 @@ impl LocalSet { run_until.await } - /// Tick the scheduler, returning whether the local future needs to be + /// Ticks the scheduler, returning whether the local future needs to be /// notified again. fn tick(&self) -> bool { for _ in 0..MAX_TASKS_PER_TICK { diff --git a/src/time/clock.rs b/src/time/clock.rs index fae5c76..41be9ba 100644 --- a/src/time/clock.rs +++ b/src/time/clock.rs @@ -57,11 +57,11 @@ cfg_test_util! { /// Instant to use as the clock's base instant. base: std::time::Instant, - /// Instant at which the clock was last unfrozen + /// Instant at which the clock was last unfrozen. unfrozen: Option<std::time::Instant>, } - /// Pause time + /// Pauses time. /// /// The current value of `Instant::now()` is saved and all subsequent calls /// to `Instant::now()` will return the saved value. The saved value can be @@ -101,7 +101,7 @@ cfg_test_util! { clock.pause(); } - /// Resume time + /// Resumes time. /// /// Clears the saved `Instant::now()` value. Subsequent calls to /// `Instant::now()` will return the value returned by the system call. @@ -121,7 +121,7 @@ cfg_test_util! { inner.unfrozen = Some(std::time::Instant::now()); } - /// Advance time. + /// Advances time. /// /// Increments the saved `Instant::now()` value by `duration`. Subsequent /// calls to `Instant::now()` will return the result of the increment. @@ -159,7 +159,7 @@ cfg_test_util! { crate::task::yield_now().await; } - /// Return the current instant, factoring in frozen time. + /// Returns the current instant, factoring in frozen time. pub(crate) fn now() -> Instant { if let Some(clock) = clock() { clock.now() @@ -169,7 +169,7 @@ cfg_test_util! { } impl Clock { - /// Return a new `Clock` instance that uses the current execution context's + /// Returns a new `Clock` instance that uses the current execution context's /// source of time. pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock { let now = std::time::Instant::now(); diff --git a/src/time/driver/entry.rs b/src/time/driver/entry.rs index 168e0b9..9e9f0dc 100644 --- a/src/time/driver/entry.rs +++ b/src/time/driver/entry.rs @@ -345,7 +345,7 @@ impl TimerShared { } } - /// Gets the cached time-of-expiration value + /// Gets the cached time-of-expiration value. pub(super) fn cached_when(&self) -> u64 { // Cached-when is only accessed under the driver lock, so we can use relaxed self.driver_state.0.cached_when.load(Ordering::Relaxed) diff --git a/src/time/driver/handle.rs b/src/time/driver/handle.rs index 77b4358..7aaf65a 100644 --- a/src/time/driver/handle.rs +++ b/src/time/driver/handle.rs @@ -16,17 +16,17 @@ impl Handle { Handle { time_source, inner } } - /// Returns the time source associated with this handle + /// Returns the time source associated with this handle. pub(super) fn time_source(&self) -> &ClockTime { &self.time_source } - /// Access the driver's inner structure + /// Access the driver's inner structure. pub(super) fn get(&self) -> &super::Inner { &*self.inner } - // Check whether the driver has been shutdown + /// Checks whether the driver has been shutdown. pub(super) fn is_shutdown(&self) -> bool { self.inner.is_shutdown() } diff --git a/src/time/driver/mod.rs b/src/time/driver/mod.rs index f611fbb..cf2290b 100644 --- a/src/time/driver/mod.rs +++ b/src/time/driver/mod.rs @@ -4,7 +4,7 @@ #![allow(unused_unsafe)] #![cfg_attr(not(feature = "rt"), allow(dead_code))] -//! Time driver +//! Time driver. mod entry; pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared}; @@ -83,13 +83,13 @@ use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; /// [interval]: crate::time::Interval #[derive(Debug)] pub(crate) struct Driver<P: Park + 'static> { - /// Timing backend in use + /// Timing backend in use. time_source: ClockTime, - /// Shared state + /// Shared state. handle: Handle, - /// Parker to delegate to + /// Parker to delegate to. park: P, // When `true`, a call to `park_timeout` should immediately return and time @@ -146,25 +146,25 @@ struct Inner { // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex pub(super) state: Mutex<InnerState>, - /// True if the driver is being shutdown + /// True if the driver is being shutdown. pub(super) is_shutdown: AtomicBool, } /// Time state shared which must be protected by a `Mutex` struct InnerState { - /// Timing backend in use + /// Timing backend in use. time_source: ClockTime, /// The last published timer `elapsed` value. elapsed: u64, - /// The earliest time at which we promise to wake up without unparking + /// The earliest time at which we promise to wake up without unparking. next_wake: Option<NonZeroU64>, - /// Timer wheel + /// Timer wheel. wheel: wheel::Wheel, - /// Unparker that can be used to wake the time driver + /// Unparker that can be used to wake the time driver. unpark: Box<dyn Unpark>, } diff --git a/src/time/driver/sleep.rs b/src/time/driver/sleep.rs index 4e9ed65..43ff694 100644 --- a/src/time/driver/sleep.rs +++ b/src/time/driver/sleep.rs @@ -1,11 +1,17 @@ use crate::time::driver::{Handle, TimerEntry}; use crate::time::{error::Error, Duration, Instant}; +use crate::util::trace; use pin_project_lite::pin_project; use std::future::Future; +use std::panic::Location; use std::pin::Pin; use std::task::{self, Poll}; +cfg_trace! { + use crate::time::driver::ClockTime; +} + /// Waits until `deadline` is reached. /// /// No work is performed while awaiting on the sleep future to complete. `Sleep` @@ -39,8 +45,9 @@ use std::task::{self, Poll}; /// [`interval`]: crate::time::interval() // Alias for old name in 0.x #[cfg_attr(docsrs, doc(alias = "delay_until"))] +#[cfg_attr(tokio_track_caller, track_caller)] pub fn sleep_until(deadline: Instant) -> Sleep { - Sleep::new_timeout(deadline) + return Sleep::new_timeout(deadline, trace::caller_location()); } /// Waits until `duration` has elapsed. @@ -82,10 +89,13 @@ pub fn sleep_until(deadline: Instant) -> Sleep { // Alias for old name in 0.x #[cfg_attr(docsrs, doc(alias = "delay_for"))] #[cfg_attr(docsrs, doc(alias = "wait"))] +#[cfg_attr(tokio_track_caller, track_caller)] pub fn sleep(duration: Duration) -> Sleep { + let location = trace::caller_location(); + match Instant::now().checked_add(duration) { - Some(deadline) => sleep_until(deadline), - None => sleep_until(Instant::far_future()), + Some(deadline) => Sleep::new_timeout(deadline, location), + None => Sleep::new_timeout(Instant::far_future(), location), } } @@ -182,7 +192,7 @@ pin_project! { #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Sleep { - deadline: Instant, + inner: Inner, // The link between the `Sleep` instance and the timer that drives it. #[pin] @@ -190,21 +200,87 @@ pin_project! { } } +cfg_trace! { + #[derive(Debug)] + struct Inner { + deadline: Instant, + resource_span: tracing::Span, + async_op_span: tracing::Span, + time_source: ClockTime, + } +} + +cfg_not_trace! { + #[derive(Debug)] + struct Inner { + deadline: Instant, + } +} + impl Sleep { - pub(crate) fn new_timeout(deadline: Instant) -> Sleep { + #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] + pub(crate) fn new_timeout( + deadline: Instant, + location: Option<&'static Location<'static>>, + ) -> Sleep { let handle = Handle::current(); let entry = TimerEntry::new(&handle, deadline); - Sleep { deadline, entry } + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = { + let time_source = handle.time_source().clone(); + let deadline_tick = time_source.deadline_to_tick(deadline); + let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0); + + #[cfg(tokio_track_caller)] + let location = location.expect("should have location if tracking caller"); + + #[cfg(tokio_track_caller)] + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Sleep", + kind = "timer", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + #[cfg(not(tokio_track_caller))] + let resource_span = + tracing::trace_span!("runtime.resource", concrete_type = "Sleep", kind = "timer"); + + let async_op_span = + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout"); + + tracing::trace!( + target: "runtime::resource::state_update", + parent: resource_span.id(), + duration = duration, + duration.unit = "ms", + duration.op = "override", + ); + + Inner { + deadline, + resource_span, + async_op_span, + time_source, + } + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = Inner { deadline }; + + Sleep { inner, entry } } - pub(crate) fn far_future() -> Sleep { - Self::new_timeout(Instant::far_future()) + pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep { + Self::new_timeout(Instant::far_future(), location) } /// Returns the instant at which the future will complete. pub fn deadline(&self) -> Instant { - self.deadline + self.inner.deadline } /// Returns `true` if `Sleep` has elapsed. @@ -244,37 +320,83 @@ impl Sleep { /// /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut pub fn reset(self: Pin<&mut Self>, deadline: Instant) { + self.reset_inner(deadline) + } + + fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { let me = self.project(); me.entry.reset(deadline); - *me.deadline = deadline; + (*me.inner).deadline = deadline; + + #[cfg(all(tokio_unstable, feature = "tracing"))] + { + me.inner.async_op_span = + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset"); + + tracing::trace!( + target: "runtime::resource::state_update", + parent: me.inner.resource_span.id(), + duration = { + let now = me.inner.time_source.now(); + let deadline_tick = me.inner.time_source.deadline_to_tick(deadline); + deadline_tick.checked_sub(now).unwrap_or(0) + }, + duration.unit = "ms", + duration.op = "override", + ); + } } - fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { - let me = self.project(); + cfg_not_trace! { + fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { + let me = self.project(); - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); - me.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }) + me.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }) + } + } + + cfg_trace! { + fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> { + let me = self.project(); + // Keep track of task budget + let coop = ready!(trace_poll_op!( + "poll_elapsed", + crate::coop::poll_proceed(cx), + me.inner.resource_span.id(), + )); + + let result = me.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }); + + trace_poll_op!("poll_elapsed", result, me.inner.resource_span.id()) + } } } impl Future for Sleep { type Output = (); + // `poll_elapsed` can return an error in two cases: + // + // - AtCapacity: this is a pathological case where far too many + // sleep instances have been scheduled. + // - Shutdown: No timer has been setup, which is a mis-use error. + // + // Both cases are extremely rare, and pretty accurately fit into + // "logic errors", so we just panic in this case. A user couldn't + // really do much better if we passed the error onwards. fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { - // `poll_elapsed` can return an error in two cases: - // - // - AtCapacity: this is a pathological case where far too many - // sleep instances have been scheduled. - // - Shutdown: No timer has been setup, which is a mis-use error. - // - // Both cases are extremely rare, and pretty accurately fit into - // "logic errors", so we just panic in this case. A user couldn't - // really do much better if we passed the error onwards. + #[cfg(all(tokio_unstable, feature = "tracing"))] + let _span = self.inner.async_op_span.clone().entered(); + match ready!(self.as_mut().poll_elapsed(cx)) { Ok(()) => Poll::Ready(()), Err(e) => panic!("timer error: {}", e), diff --git a/src/time/driver/wheel/level.rs b/src/time/driver/wheel/level.rs index 81d6b58..34d3176 100644 --- a/src/time/driver/wheel/level.rs +++ b/src/time/driver/wheel/level.rs @@ -250,7 +250,7 @@ fn level_range(level: usize) -> u64 { LEVEL_MULT as u64 * slot_range(level) } -/// Convert a duration (milliseconds) and a level to a slot position +/// Converts a duration (milliseconds) and a level to a slot position. fn slot_for(duration: u64, level: usize) -> usize { ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize } diff --git a/src/time/driver/wheel/mod.rs b/src/time/driver/wheel/mod.rs index 5a40f6d..f088f2c 100644 --- a/src/time/driver/wheel/mod.rs +++ b/src/time/driver/wheel/mod.rs @@ -46,11 +46,11 @@ pub(crate) struct Wheel { /// precision of 1 millisecond. const NUM_LEVELS: usize = 6; -/// The maximum duration of a `Sleep` +/// The maximum duration of a `Sleep`. pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; impl Wheel { - /// Create a new timing wheel + /// Creates a new timing wheel. pub(crate) fn new() -> Wheel { let levels = (0..NUM_LEVELS).map(Level::new).collect(); @@ -61,13 +61,13 @@ impl Wheel { } } - /// Return the number of milliseconds that have elapsed since the timing + /// Returns the number of milliseconds that have elapsed since the timing /// wheel's creation. pub(crate) fn elapsed(&self) -> u64 { self.elapsed } - /// Insert an entry into the timing wheel. + /// Inserts an entry into the timing wheel. /// /// # Arguments /// @@ -115,7 +115,7 @@ impl Wheel { Ok(when) } - /// Remove `item` from the timing wheel. + /// Removes `item` from the timing wheel. pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) { unsafe { let when = item.as_ref().cached_when(); @@ -136,7 +136,7 @@ impl Wheel { } } - /// Instant at which to poll + /// Instant at which to poll. pub(crate) fn poll_at(&self) -> Option<u64> { self.next_expiration().map(|expiration| expiration.deadline) } diff --git a/src/time/driver/wheel/stack.rs b/src/time/driver/wheel/stack.rs index e7ed137..80651c3 100644 --- a/src/time/driver/wheel/stack.rs +++ b/src/time/driver/wheel/stack.rs @@ -3,7 +3,7 @@ use crate::time::driver::Entry; use std::ptr; -/// A doubly linked stack +/// A doubly linked stack. #[derive(Debug)] pub(crate) struct Stack { head: Option<OwnedItem>, @@ -50,7 +50,7 @@ impl Stack { self.head = Some(entry); } - /// Pops an item from the stack + /// Pops an item from the stack. pub(crate) fn pop(&mut self) -> Option<OwnedItem> { let entry = self.head.take(); diff --git a/src/time/error.rs b/src/time/error.rs index 8674feb..63f0a3b 100644 --- a/src/time/error.rs +++ b/src/time/error.rs @@ -40,7 +40,7 @@ impl From<Kind> for Error { } } -/// Error returned by `Timeout`. +/// Errors returned by `Timeout`. #[derive(Debug, PartialEq)] pub struct Elapsed(()); @@ -72,7 +72,7 @@ impl Error { matches!(self.0, Kind::AtCapacity) } - /// Create an error representing a misconfigured timer. + /// Creates an error representing a misconfigured timer. pub fn invalid() -> Error { Error(Invalid) } diff --git a/src/time/interval.rs b/src/time/interval.rs index a63e47b..7e07e51 100644 --- a/src/time/interval.rs +++ b/src/time/interval.rs @@ -147,7 +147,7 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval { /// milliseconds. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum MissedTickBehavior { - /// Tick as fast as possible until caught up. + /// Ticks as fast as possible until caught up. /// /// When this strategy is used, [`Interval`] schedules ticks "normally" (the /// same as it would have if the ticks hadn't been delayed), which results @@ -252,7 +252,7 @@ pub enum MissedTickBehavior { /// [`tick`]: Interval::tick Delay, - /// Skip missed ticks and tick on the next multiple of `period` from + /// Skips missed ticks and tick on the next multiple of `period` from /// `start`. /// /// When this strategy is used, [`Interval`] schedules the next tick to fire @@ -342,7 +342,7 @@ impl Default for MissedTickBehavior { } } -/// Interval returned by [`interval`] and [`interval_at`] +/// Interval returned by [`interval`] and [`interval_at`]. /// /// This type allows you to wait on a sequence of instants with a certain /// duration between each instant. Unlike calling [`sleep`] in a loop, this lets @@ -367,6 +367,11 @@ pub struct Interval { impl Interval { /// Completes when the next instant in the interval has been reached. /// + /// # Cancel safety + /// + /// This method is cancellation safe. If `tick` is used as the branch in a `tokio::select!` and + /// another branch completes first, then no tick has been consumed. + /// /// # Examples /// /// ``` @@ -389,7 +394,7 @@ impl Interval { poll_fn(|cx| self.poll_tick(cx)).await } - /// Poll for the next instant in the interval to be reached. + /// Polls for the next instant in the interval to be reached. /// /// This method can return the following values: /// diff --git a/src/time/timeout.rs b/src/time/timeout.rs index 61964ad..6725caa 100644 --- a/src/time/timeout.rs +++ b/src/time/timeout.rs @@ -4,14 +4,17 @@ //! //! [`Timeout`]: struct@Timeout -use crate::time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}; +use crate::{ + time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}, + util::trace, +}; use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{self, Poll}; -/// Require a `Future` to complete before the specified duration has elapsed. +/// Requires a `Future` to complete before the specified duration has elapsed. /// /// If the future completes before the duration has elapsed, then the completed /// value is returned. Otherwise, an error is returned and the future is @@ -45,19 +48,22 @@ use std::task::{self, Poll}; /// } /// # } /// ``` +#[cfg_attr(tokio_track_caller, track_caller)] pub fn timeout<T>(duration: Duration, future: T) -> Timeout<T> where T: Future, { + let location = trace::caller_location(); + let deadline = Instant::now().checked_add(duration); let delay = match deadline { - Some(deadline) => Sleep::new_timeout(deadline), - None => Sleep::far_future(), + Some(deadline) => Sleep::new_timeout(deadline, location), + None => Sleep::far_future(location), }; Timeout::new_with_delay(future, delay) } -/// Require a `Future` to complete before the specified instant in time. +/// Requires a `Future` to complete before the specified instant in time. /// /// If the future completes before the instant is reached, then the completed /// value is returned. Otherwise, an error is returned. diff --git a/src/util/bit.rs b/src/util/bit.rs index 392a0e8..a43c2c2 100644 --- a/src/util/bit.rs +++ b/src/util/bit.rs @@ -27,7 +27,7 @@ impl Pack { pointer_width() - (self.mask >> self.shift).leading_zeros() } - /// Max representable value + /// Max representable value. pub(crate) const fn max_value(&self) -> usize { (1 << self.width()) - 1 } @@ -60,7 +60,7 @@ impl fmt::Debug for Pack { } } -/// Returns the width of a pointer in bits +/// Returns the width of a pointer in bits. pub(crate) const fn pointer_width() -> u32 { std::mem::size_of::<usize>() as u32 * 8 } @@ -71,7 +71,7 @@ pub(crate) const fn mask_for(n: u32) -> usize { shift | (shift - 1) } -/// Unpack a value using a mask & shift +/// Unpacks a value using a mask & shift. pub(crate) const fn unpack(src: usize, mask: usize, shift: u32) -> usize { (src & mask) >> shift } diff --git a/src/util/error.rs b/src/util/error.rs index 0e52364..8f252c0 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -7,3 +7,11 @@ pub(crate) const CONTEXT_MISSING_ERROR: &str = /// Error string explaining that the Tokio context is shutting down and cannot drive timers. pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str = "A Tokio 1.x context was found, but it is being shutdown."; + +// some combinations of features might not use this +#[allow(dead_code)] +/// Error string explaining that the Tokio context is not available because the +/// thread-local storing it has been destroyed. This usually only happens during +/// destructors of other thread-locals. +pub(crate) const THREAD_LOCAL_DESTROYED_ERROR: &str = + "The Tokio context thread-local variable has been destroyed."; diff --git a/src/util/linked_list.rs b/src/util/linked_list.rs index 1eab81c..894d216 100644 --- a/src/util/linked_list.rs +++ b/src/util/linked_list.rs @@ -1,6 +1,6 @@ #![cfg_attr(not(feature = "full"), allow(dead_code))] -//! An intrusive double linked list of data +//! An intrusive double linked list of data. //! //! The data structure supports tracking pinned nodes. Most of the data //! structure's APIs are `unsafe` as they require the caller to ensure the @@ -46,10 +46,10 @@ pub(crate) unsafe trait Link { /// This is usually a pointer-ish type. type Handle; - /// Node type + /// Node type. type Target; - /// Convert the handle to a raw pointer without consuming the handle + /// Convert the handle to a raw pointer without consuming the handle. #[allow(clippy::wrong_self_convention)] fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target>; @@ -60,7 +60,7 @@ pub(crate) unsafe trait Link { unsafe fn pointers(target: NonNull<Self::Target>) -> NonNull<Pointers<Self::Target>>; } -/// Previous / next pointers +/// Previous / next pointers. pub(crate) struct Pointers<T> { inner: UnsafeCell<PointersInner<T>>, } diff --git a/src/util/rand.rs b/src/util/rand.rs index 17b3ec1..6b19c8b 100644 --- a/src/util/rand.rs +++ b/src/util/rand.rs @@ -1,6 +1,6 @@ use std::cell::Cell; -/// Fast random number generate +/// Fast random number generate. /// /// Implement xorshift64+: 2 32-bit xorshift sequences added together. /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's @@ -14,7 +14,7 @@ pub(crate) struct FastRand { } impl FastRand { - /// Initialize a new, thread-local, fast random number generator. + /// Initializes a new, thread-local, fast random number generator. pub(crate) fn new(seed: u64) -> FastRand { let one = (seed >> 32) as u32; let mut two = seed as u32; diff --git a/src/util/slab.rs b/src/util/slab.rs index 2ddaa6c..97355d5 100644 --- a/src/util/slab.rs +++ b/src/util/slab.rs @@ -85,11 +85,11 @@ pub(crate) struct Address(usize); /// An entry in the slab. pub(crate) trait Entry: Default { - /// Reset the entry's value and track the generation. + /// Resets the entry's value and track the generation. fn reset(&self); } -/// A reference to a value stored in the slab +/// A reference to a value stored in the slab. pub(crate) struct Ref<T> { value: *const Value<T>, } @@ -101,9 +101,9 @@ const NUM_PAGES: usize = 19; const PAGE_INITIAL_SIZE: usize = 32; const PAGE_INDEX_SHIFT: u32 = PAGE_INITIAL_SIZE.trailing_zeros() + 1; -/// A page in the slab +/// A page in the slab. struct Page<T> { - /// Slots + /// Slots. slots: Mutex<Slots<T>>, // Number of slots currently being used. This is not guaranteed to be up to @@ -116,7 +116,7 @@ struct Page<T> { // The number of slots the page can hold. len: usize, - // Length of all previous pages combined + // Length of all previous pages combined. prev_len: usize, } @@ -128,9 +128,9 @@ struct CachedPage<T> { init: usize, } -/// Page state +/// Page state. struct Slots<T> { - /// Slots + /// Slots. slots: Vec<Slot<T>>, head: usize, @@ -159,9 +159,9 @@ struct Slot<T> { next: u32, } -/// Value paired with a reference to the page +/// Value paired with a reference to the page. struct Value<T> { - /// Value stored in the value + /// Value stored in the value. value: T, /// Pointer to the page containing the slot. @@ -171,7 +171,7 @@ struct Value<T> { } impl<T> Slab<T> { - /// Create a new, empty, slab + /// Create a new, empty, slab. pub(crate) fn new() -> Slab<T> { // Initializing arrays is a bit annoying. Instead of manually writing // out an array and every single entry, `Default::default()` is used to @@ -455,7 +455,7 @@ impl<T> Page<T> { addr.0 - self.prev_len } - /// Returns the address for the given slot + /// Returns the address for the given slot. fn addr(&self, slot: usize) -> Address { Address(slot + self.prev_len) } @@ -478,7 +478,7 @@ impl<T> Default for Page<T> { } impl<T> Page<T> { - /// Release a slot into the page's free list + /// Release a slot into the page's free list. fn release(&self, value: *const Value<T>) { let mut locked = self.slots.lock(); @@ -492,7 +492,7 @@ impl<T> Page<T> { } impl<T> CachedPage<T> { - /// Refresh the cache + /// Refreshes the cache. fn refresh(&mut self, page: &Page<T>) { let slots = page.slots.lock(); @@ -502,7 +502,7 @@ impl<T> CachedPage<T> { } } - // Get a value by index + /// Gets a value by index. fn get(&self, idx: usize) -> &T { assert!(idx < self.init); @@ -576,7 +576,7 @@ impl<T: Entry> Slot<T> { } impl<T> Value<T> { - // Release the slot, returning the `Arc<Page<T>>` logically owned by the ref. + /// Releases the slot, returning the `Arc<Page<T>>` logically owned by the ref. fn release(&self) -> Arc<Page<T>> { // Safety: called by `Ref`, which owns an `Arc<Page<T>>` instance. let page = unsafe { Arc::from_raw(self.page) }; diff --git a/src/util/trace.rs b/src/util/trace.rs index 61c155c..e3c26f9 100644 --- a/src/util/trace.rs +++ b/src/util/trace.rs @@ -14,7 +14,9 @@ cfg_trace! { "runtime.spawn", %kind, task.name = %name.unwrap_or_default(), - spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), ); #[cfg(not(tokio_track_caller))] let span = tracing::trace_span!( @@ -27,6 +29,15 @@ cfg_trace! { } } } +cfg_time! { + #[cfg_attr(tokio_track_caller, track_caller)] + pub(crate) fn caller_location() -> Option<&'static std::panic::Location<'static>> { + #[cfg(all(tokio_track_caller, tokio_unstable, feature = "tracing"))] + return Some(std::panic::Location::caller()); + #[cfg(not(all(tokio_track_caller, tokio_unstable, feature = "tracing")))] + None + } +} cfg_not_trace! { cfg_rt! { diff --git a/src/util/vec_deque_cell.rs b/src/util/vec_deque_cell.rs index 12883ab..b4e124c 100644 --- a/src/util/vec_deque_cell.rs +++ b/src/util/vec_deque_cell.rs @@ -45,7 +45,7 @@ impl<T> VecDequeCell<T> { } } - /// Replace the inner VecDeque with an empty VecDeque and return the current + /// Replaces the inner VecDeque with an empty VecDeque and return the current /// contents. pub(crate) fn take(&self) -> VecDeque<T> { unsafe { self.with_inner(|inner| std::mem::take(inner)) } diff --git a/src/util/wake.rs b/src/util/wake.rs index 5773937..8f89668 100644 --- a/src/util/wake.rs +++ b/src/util/wake.rs @@ -4,12 +4,12 @@ use std::ops::Deref; use std::sync::Arc; use std::task::{RawWaker, RawWakerVTable, Waker}; -/// Simplified waking interface based on Arcs +/// Simplified waking interface based on Arcs. pub(crate) trait Wake: Send + Sync { - /// Wake by value + /// Wake by value. fn wake(self: Arc<Self>); - /// Wake by reference + /// Wake by reference. fn wake_by_ref(arc_self: &Arc<Self>); } diff --git a/tests/macros_select.rs b/tests/macros_select.rs index 4da88fb..b4f8544 100644 --- a/tests/macros_select.rs +++ b/tests/macros_select.rs @@ -558,3 +558,29 @@ pub async fn default_numeric_fallback() { else => (), } } + +// https://github.com/tokio-rs/tokio/issues/4182 +#[tokio::test] +async fn mut_ref_patterns() { + tokio::select! { + Some(mut foo) = async { Some("1".to_string()) } => { + assert_eq!(foo, "1"); + foo = "2".to_string(); + assert_eq!(foo, "2"); + }, + }; + + tokio::select! { + Some(ref foo) = async { Some("1".to_string()) } => { + assert_eq!(*foo, "1"); + }, + }; + + tokio::select! { + Some(ref mut foo) = async { Some("1".to_string()) } => { + assert_eq!(*foo, "1"); + *foo = "2".to_string(); + assert_eq!(*foo, "2"); + }, + }; +} diff --git a/tests/macros_test.rs b/tests/macros_test.rs index 7212c7b..bca2c91 100644 --- a/tests/macros_test.rs +++ b/tests/macros_test.rs @@ -30,3 +30,19 @@ fn trait_method() { } ().f() } + +// https://github.com/tokio-rs/tokio/issues/4175 +#[tokio::main] +pub async fn issue_4175_main_1() -> ! { + panic!(); +} +#[tokio::main] +pub async fn issue_4175_main_2() -> std::io::Result<()> { + panic!(); +} +#[allow(unreachable_code)] +#[tokio::test] +pub async fn issue_4175_test() -> std::io::Result<()> { + return Ok(()); + panic!(); +} diff --git a/tests/rt_basic.rs b/tests/rt_basic.rs index 4b1bdad..70056b1 100644 --- a/tests/rt_basic.rs +++ b/tests/rt_basic.rs @@ -3,10 +3,14 @@ use tokio::runtime::Runtime; use tokio::sync::oneshot; +use tokio::time::{timeout, Duration}; use tokio_test::{assert_err, assert_ok}; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::task::{Context, Poll}; use std::thread; -use tokio::time::{timeout, Duration}; mod support { pub(crate) mod mpsc_stream; @@ -136,6 +140,35 @@ fn acquire_mutex_in_drop() { } #[test] +fn drop_tasks_in_context() { + static SUCCESS: AtomicBool = AtomicBool::new(false); + + struct ContextOnDrop; + + impl Future for ContextOnDrop { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Pending + } + } + + impl Drop for ContextOnDrop { + fn drop(&mut self) { + if tokio::runtime::Handle::try_current().is_ok() { + SUCCESS.store(true, Ordering::SeqCst); + } + } + } + + let rt = rt(); + rt.spawn(ContextOnDrop); + drop(rt); + + assert!(SUCCESS.load(Ordering::SeqCst)); +} + +#[test] #[should_panic( expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers." )] diff --git a/tests/rt_threaded.rs b/tests/rt_threaded.rs index 9e76c4e..5f047a7 100644 --- a/tests/rt_threaded.rs +++ b/tests/rt_threaded.rs @@ -54,6 +54,7 @@ fn many_oneshot_futures() { drop(rt); } } + #[test] fn many_multishot_futures() { const CHAIN: usize = 200; @@ -473,6 +474,30 @@ fn wake_during_shutdown() { rt.block_on(async { tokio::time::sleep(tokio::time::Duration::from_millis(20)).await }); } +#[should_panic] +#[tokio::test] +async fn test_block_in_place1() { + tokio::task::block_in_place(|| {}); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_block_in_place2() { + tokio::task::block_in_place(|| {}); +} + +#[should_panic] +#[tokio::main(flavor = "current_thread")] +#[test] +async fn test_block_in_place3() { + tokio::task::block_in_place(|| {}); +} + +#[tokio::main] +#[test] +async fn test_block_in_place4() { + tokio::task::block_in_place(|| {}); +} + fn rt() -> Runtime { Runtime::new().unwrap() } diff --git a/tests/udp.rs b/tests/udp.rs index 715d8eb..ec2a1e9 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -5,6 +5,7 @@ use futures::future::poll_fn; use std::io; use std::sync::Arc; use tokio::{io::ReadBuf, net::UdpSocket}; +use tokio_test::assert_ok; const MSG: &[u8] = b"hello"; const MSG_LEN: usize = MSG.len(); @@ -440,3 +441,46 @@ async fn try_recv_buf_from() { } } } + +#[tokio::test] +async fn poll_ready() { + // Create listener + let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let saddr = server.local_addr().unwrap(); + + // Create socket pair + let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let caddr = client.local_addr().unwrap(); + + for _ in 0..5 { + loop { + assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await); + + match client.try_send_to(b"hello world", saddr) { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await); + + let mut buf = Vec::with_capacity(512); + + match server.try_recv_buf_from(&mut buf) { + Ok((n, addr)) => { + assert_eq!(n, 11); + assert_eq!(addr, caddr); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } +} diff --git a/tests/uds_datagram.rs b/tests/uds_datagram.rs index 4d28468..5e5486b 100644 --- a/tests/uds_datagram.rs +++ b/tests/uds_datagram.rs @@ -328,3 +328,50 @@ async fn try_recv_buf_never_block() -> io::Result<()> { Ok(()) } + +#[tokio::test] +async fn poll_ready() -> io::Result<()> { + let dir = tempfile::tempdir().unwrap(); + let server_path = dir.path().join("server.sock"); + let client_path = dir.path().join("client.sock"); + + // Create listener + let server = UnixDatagram::bind(&server_path)?; + + // Create socket pair + let client = UnixDatagram::bind(&client_path)?; + + for _ in 0..5 { + loop { + poll_fn(|cx| client.poll_send_ready(cx)).await?; + + match client.try_send_to(b"hello world", &server_path) { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + poll_fn(|cx| server.poll_recv_ready(cx)).await?; + + let mut buf = Vec::with_capacity(512); + + match server.try_recv_buf_from(&mut buf) { + Ok((n, addr)) => { + assert_eq!(n, 11); + assert_eq!(addr.as_pathname(), Some(client_path.as_ref())); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } + + Ok(()) +} |