From a77df93fec1b865e952e7a931ea3cd129aca6a46 Mon Sep 17 00:00:00 2001 From: Jeff Vander Stoep Date: Mon, 6 Feb 2023 09:15:26 +0100 Subject: Upgrade tokio to 1.25.0 This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update rust/crates/tokio For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md Test: TreeHugger Change-Id: Ibe4881eebae2509ecb442aa1c803461d9348eaa3 --- .cargo_vcs_info.json | 2 +- Android.bp | 2 +- CHANGELOG.md | 112 ++++++++- Cargo.toml | 6 +- Cargo.toml.orig | 6 +- LICENSE | 2 +- METADATA | 10 +- README.md | 2 +- build.rs | 53 +++++ docs/reactor-refactor.md | 4 +- patches/panic_unwind_tests.patch | 45 +++- src/fs/file/tests.rs | 8 +- src/fs/read_dir.rs | 91 +++++-- src/io/blocking.rs | 2 +- src/io/split.rs | 5 +- src/io/stdio_common.rs | 3 +- src/io/util/read.rs | 2 +- src/lib.rs | 15 +- src/loom/mocked.rs | 7 + src/loom/std/atomic_u64.rs | 9 +- src/loom/std/atomic_u64_as_mutex.rs | 18 +- src/loom/std/atomic_u64_native.rs | 4 + src/loom/std/atomic_u64_static_const_new.rs | 12 + src/loom/std/atomic_u64_static_once_cell.rs | 57 +++++ src/loom/std/mod.rs | 24 +- src/macros/cfg.rs | 30 +-- src/net/tcp/listener.rs | 13 +- src/net/tcp/socket.rs | 11 +- src/net/tcp/split_owned.rs | 4 +- src/net/tcp/stream.rs | 13 +- src/net/udp.rs | 13 +- src/net/unix/datagram/socket.rs | 31 ++- src/net/unix/listener.rs | 46 +++- src/net/unix/split_owned.rs | 4 +- src/net/unix/stream.rs | 28 ++- src/net/windows/named_pipe.rs | 54 ++++- src/process/unix/mod.rs | 2 +- src/process/unix/orphan.rs | 2 +- src/runtime/blocking/mod.rs | 2 - src/runtime/blocking/pool.rs | 9 +- src/runtime/blocking/schedule.rs | 49 +++- src/runtime/builder.rs | 26 ++ src/runtime/context.rs | 22 +- src/runtime/driver.rs | 9 +- src/runtime/handle.rs | 6 +- src/runtime/io/mod.rs | 16 +- src/runtime/io/registration.rs | 26 +- src/runtime/io/scheduled_io.rs | 43 ++-- src/runtime/metrics/batch.rs | 13 +- src/runtime/metrics/mock.rs | 1 + src/runtime/metrics/runtime.rs | 59 ++++- src/runtime/metrics/worker.rs | 6 +- src/runtime/mod.rs | 3 + src/runtime/runtime.rs | 6 +- src/runtime/scheduler/multi_thread/queue.rs | 3 +- src/runtime/task/harness.rs | 49 ++-- src/runtime/task/id.rs | 87 +++++++ src/runtime/task/join.rs | 6 +- src/runtime/task/mod.rs | 136 +---------- src/runtime/task/state.rs | 8 +- src/runtime/tests/loom_blocking.rs | 21 ++ src/runtime/tests/loom_queue.rs | 2 +- src/runtime/tests/mod.rs | 20 +- src/runtime/tests/task.rs | 2 +- src/runtime/thread_id.rs | 31 +++ src/runtime/time/mod.rs | 2 +- src/signal/registry.rs | 5 +- src/signal/unix.rs | 3 +- src/sync/broadcast.rs | 98 +++++++- src/sync/mod.rs | 8 + src/sync/mpsc/block.rs | 143 +++++++---- src/sync/mpsc/list.rs | 2 +- src/sync/mpsc/mod.rs | 6 +- src/task/join_set.rs | 24 +- src/task/local.rs | 47 ++-- src/task/spawn.rs | 6 +- src/time/clock.rs | 19 +- src/time/sleep.rs | 2 +- src/util/linked_list.rs | 2 +- src/util/mod.rs | 7 +- src/util/once_cell.rs | 4 +- tests/_require_full.rs | 8 +- tests/buffered.rs | 4 +- tests/io_driver.rs | 2 +- tests/macros_join.rs | 2 +- tests/macros_select.rs | 2 +- tests/macros_try_join.rs | 2 +- tests/rt_common.rs | 2 +- tests/rt_metrics.rs | 19 +- tests/support/leaked_buffers.rs | 6 +- tests/support/panic.rs | 8 +- tests/sync_broadcast.rs | 60 +++++ tests/sync_once_cell.rs | 355 ++++++++++++++-------------- tests/task_blocking.rs | 83 ++++++- tests/task_join_set.rs | 95 ++++---- tests/tcp_peek.rs | 2 +- 96 files changed, 1716 insertions(+), 725 deletions(-) create mode 100644 src/loom/std/atomic_u64_native.rs create mode 100644 src/loom/std/atomic_u64_static_const_new.rs create mode 100644 src/loom/std/atomic_u64_static_once_cell.rs create mode 100644 src/runtime/task/id.rs create mode 100644 src/runtime/thread_id.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 624eb53..fb46d1e 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "3ce5a2681c734e134c2aa6d6cf91b8d2631bd82b" + "sha1": "88b1eb54fb66461b9f3524f4b5316241a019279a" }, "path_in_vcs": "tokio" } \ No newline at end of file diff --git a/Android.bp b/Android.bp index c430388..7d066e6 100644 --- a/Android.bp +++ b/Android.bp @@ -23,7 +23,7 @@ rust_library { host_supported: true, crate_name: "tokio", cargo_env_compat: true, - cargo_pkg_version: "1.23.0", + cargo_pkg_version: "1.25.0", srcs: ["src/lib.rs"], edition: "2018", features: [ diff --git a/CHANGELOG.md b/CHANGELOG.md index f930dfc..39a57fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,75 @@ +# 1.25.0 (January 28, 2023) + +### Fixed + +- rt: fix runtime metrics reporting ([#5330]) + +### Added + +- sync: add `broadcast::Sender::len` ([#5343]) + +### Changed + +- fs: increase maximum read buffer size to 2MiB ([#5397]) + +[#5330]: https://github.com/tokio-rs/tokio/pull/5330 +[#5343]: https://github.com/tokio-rs/tokio/pull/5343 +[#5397]: https://github.com/tokio-rs/tokio/pull/5397 + +# 1.24.2 (January 17, 2023) + +Forward ports 1.18.5 changes. + +### Fixed + +- io: fix unsoundness in `ReadHalf::unsplit` ([#5375]) + +[#5375]: https://github.com/tokio-rs/tokio/pull/5375 + +# 1.24.1 (January 6, 2022) + +This release fixes a compilation failure on targets without `AtomicU64` when using rustc older than 1.63. ([#5356]) + +[#5356]: https://github.com/tokio-rs/tokio/pull/5356 + +# 1.24.0 (January 5, 2022) + +### Fixed + - rt: improve native `AtomicU64` support detection ([#5284]) + +### Added + - rt: add configuration option for max number of I/O events polled from the OS + per tick ([#5186]) + - rt: add an environment variable for configuring the default number of worker + threads per runtime instance ([#4250]) + +### Changed + - sync: reduce MPSC channel stack usage ([#5294]) + - io: reduce lock contention in I/O operations ([#5300]) + - fs: speed up `read_dir()` by chunking operations ([#5309]) + - rt: use internal `ThreadId` implementation ([#5329]) + - test: don't auto-advance time when a `spawn_blocking` task is running ([#5115]) + +[#5186]: https://github.com/tokio-rs/tokio/pull/5186 +[#5294]: https://github.com/tokio-rs/tokio/pull/5294 +[#5284]: https://github.com/tokio-rs/tokio/pull/5284 +[#4250]: https://github.com/tokio-rs/tokio/pull/4250 +[#5300]: https://github.com/tokio-rs/tokio/pull/5300 +[#5329]: https://github.com/tokio-rs/tokio/pull/5329 +[#5115]: https://github.com/tokio-rs/tokio/pull/5115 +[#5309]: https://github.com/tokio-rs/tokio/pull/5309 + +# 1.23.1 (January 4, 2022) + +This release forward ports changes from 1.18.4. + +### Fixed + +- net: fix Windows named pipe server builder to maintain option when toggling + pipe mode ([#5336]). + +[#5336]: https://github.com/tokio-rs/tokio/pull/5336 + # 1.23.0 (December 5, 2022) ### Fixed @@ -262,6 +334,27 @@ wasm32-wasi target is given unstable support for the `net` feature. [#4956]: https://github.com/tokio-rs/tokio/pull/4956 [#4959]: https://github.com/tokio-rs/tokio/pull/4959 +# 1.20.4 (January 17, 2023) + +Forward ports 1.18.5 changes. + +### Fixed + +- io: fix unsoundness in `ReadHalf::unsplit` ([#5375]) + +[#5375]: https://github.com/tokio-rs/tokio/pull/5375 + +# 1.20.3 (January 3, 2022) + +This release forward ports changes from 1.18.4. + +### Fixed + +- net: fix Windows named pipe server builder to maintain option when toggling + pipe mode ([#5336]). + +[#5336]: https://github.com/tokio-rs/tokio/pull/5336 + # 1.20.2 (September 27, 2022) This release removes the dependency on the `once_cell` crate to restore the MSRV @@ -387,6 +480,23 @@ This release fixes a bug in `Notified::enable`. ([#4747]) [#4729]: https://github.com/tokio-rs/tokio/pull/4729 [#4739]: https://github.com/tokio-rs/tokio/pull/4739 +# 1.18.5 (January 17, 2023) + +### Fixed + +- io: fix unsoundness in `ReadHalf::unsplit` ([#5375]) + +[#5375]: https://github.com/tokio-rs/tokio/pull/5375 + +# 1.18.4 (January 3, 2022) + +### Fixed + +- net: fix Windows named pipe server builder to maintain option when toggling + pipe mode ([#5336]). + +[#5336]: https://github.com/tokio-rs/tokio/pull/5336 + # 1.18.3 (September 27, 2022) This release removes the dependency on the `once_cell` crate to restore the MSRV @@ -514,7 +624,7 @@ performance improvements. - time: use bit manipulation instead of modulo to improve performance ([#4480]) - net: use `std::future::Ready` instead of our own `Ready` future ([#4271]) - replace deprecated `atomic::spin_loop_hint` with `hint::spin_loop` ([#4491]) -- fix miri failures in intrusive linked lists ([#4397]) +- fix miri failures in intrusive linked lists ([#4397]) ### Documented diff --git a/Cargo.toml b/Cargo.toml index 4c10770..2ea9473 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ edition = "2018" rust-version = "1.49" name = "tokio" -version = "1.23.0" +version = "1.25.0" authors = ["Tokio Contributors "] description = """ An event-driven, non-blocking I/O platform for writing asynchronous I/O @@ -203,7 +203,7 @@ version = "1" version = "0.4" [target."cfg(target_os = \"freebsd\")".dev-dependencies.mio-aio] -version = "0.6.0" +version = "0.7.0" features = ["tokio"] [target."cfg(tokio_unstable)".dependencies.tracing] @@ -224,7 +224,7 @@ optional = true version = "0.2.42" [target."cfg(unix)".dev-dependencies.nix] -version = "0.24" +version = "0.26" features = [ "fs", "socket", diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 78cba81..0f6d30a 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -6,7 +6,7 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.x.y" git tag. -version = "1.23.0" +version = "1.25.0" edition = "2018" rust-version = "1.49" authors = ["Tokio Contributors "] @@ -122,7 +122,7 @@ signal-hook-registry = { version = "1.1.1", optional = true } [target.'cfg(unix)'.dev-dependencies] libc = { version = "0.2.42" } -nix = { version = "0.24", default-features = false, features = ["fs", "socket"] } +nix = { version = "0.26", default-features = false, features = ["fs", "socket"] } [target.'cfg(windows)'.dependencies.windows-sys] version = "0.42.0" @@ -157,7 +157,7 @@ rand = "0.8.0" wasm-bindgen-test = "0.3.0" [target.'cfg(target_os = "freebsd")'.dev-dependencies] -mio-aio = { version = "0.6.0", features = ["tokio"] } +mio-aio = { version = "0.7.0", features = ["tokio"] } [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.5.2", features = ["futures", "checkpoint"] } diff --git a/LICENSE b/LICENSE index 8af5baf..8bdf6bd 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2022 Tokio Contributors +Copyright (c) 2023 Tokio Contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated diff --git a/METADATA b/METADATA index 4be4653..b8a5d8a 100644 --- a/METADATA +++ b/METADATA @@ -11,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio/tokio-1.23.0.crate" + value: "https://static.crates.io/crates/tokio/tokio-1.25.0.crate" } - version: "1.23.0" + version: "1.25.0" license_type: NOTICE last_upgrade_date { - year: 2022 - month: 12 - day: 12 + year: 2023 + month: 2 + day: 6 } } diff --git a/README.md b/README.md index 3e51cf5..462e6e8 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml: ```toml [dependencies] -tokio = { version = "1.23.0", features = ["full"] } +tokio = { version = "1.25.0", features = ["full"] } ``` Then, on your main.rs: diff --git a/build.rs b/build.rs index 93b0509..ddade28 100644 --- a/build.rs +++ b/build.rs @@ -24,10 +24,26 @@ const CONST_MUTEX_NEW_PROBE: &str = r#" } "#; +const TARGET_HAS_ATOMIC_PROBE: &str = r#" +{ + #[cfg(target_has_atomic = "ptr")] + let _ = (); +} +"#; + +const TARGET_ATOMIC_U64_PROBE: &str = r#" +{ + #[allow(unused_imports)] + use std::sync::atomic::AtomicU64 as _; +} +"#; + fn main() { let mut enable_const_thread_local = false; let mut enable_addr_of = false; + let mut enable_target_has_atomic = false; let mut enable_const_mutex_new = false; + let mut target_needs_atomic_u64_fallback = false; match AutoCfg::new() { Ok(ac) => { @@ -66,6 +82,27 @@ fn main() { } } + // The `target_has_atomic` cfg was stabilized in 1.60. + if ac.probe_rustc_version(1, 61) { + enable_target_has_atomic = true; + } else if ac.probe_rustc_version(1, 60) { + // This compiler claims to be 1.60, but there are some nightly + // compilers that claim to be 1.60 without supporting the + // feature. Explicitly probe to check if code using them + // compiles. + // + // The oldest nightly that supports the feature is 2022-02-11. + if ac.probe_expression(TARGET_HAS_ATOMIC_PROBE) { + enable_target_has_atomic = true; + } + } + + // If we can't tell using `target_has_atomic`, tell if the target + // has `AtomicU64` by trying to use it. + if !enable_target_has_atomic && !ac.probe_expression(TARGET_ATOMIC_U64_PROBE) { + target_needs_atomic_u64_fallback = true; + } + // The `Mutex::new` method was made const in 1.63. if ac.probe_rustc_version(1, 64) { enable_const_mutex_new = true; @@ -109,6 +146,14 @@ fn main() { autocfg::emit("tokio_no_addr_of") } + if !enable_target_has_atomic { + // To disable this feature on compilers that support it, you can + // explicitly pass this flag with the following environment variable: + // + // RUSTFLAGS="--cfg tokio_no_target_has_atomic" + autocfg::emit("tokio_no_target_has_atomic") + } + if !enable_const_mutex_new { // To disable this feature on compilers that support it, you can // explicitly pass this flag with the following environment variable: @@ -117,6 +162,14 @@ fn main() { autocfg::emit("tokio_no_const_mutex_new") } + if target_needs_atomic_u64_fallback { + // To disable this feature on compilers that support it, you can + // explicitly pass this flag with the following environment variable: + // + // RUSTFLAGS="--cfg tokio_no_atomic_u64" + autocfg::emit("tokio_no_atomic_u64") + } + let target = ::std::env::var("TARGET").unwrap_or_default(); // We emit cfgs instead of using `target_family = "wasm"` that requires Rust 1.54. diff --git a/docs/reactor-refactor.md b/docs/reactor-refactor.md index 3005afc..77e64f4 100644 --- a/docs/reactor-refactor.md +++ b/docs/reactor-refactor.md @@ -188,12 +188,12 @@ readiness, the driver's tick is packed into the atomic `usize`. The `ScheduledIo` readiness `AtomicUsize` is structured as: ``` -| reserved | generation | driver tick | readiness | +| shutdown | generation | driver tick | readiness | |----------+------------+--------------+-----------| | 1 bit | 7 bits + 8 bits + 16 bits | ``` -The `reserved` and `generation` components exist today. +The `shutdown` and `generation` components exist today. The `readiness()` function returns a `ReadyEvent` value. This value includes the `tick` component read with the resource's readiness value. When diff --git a/patches/panic_unwind_tests.patch b/patches/panic_unwind_tests.patch index adf431e..c11065b 100644 --- a/patches/panic_unwind_tests.patch +++ b/patches/panic_unwind_tests.patch @@ -1,24 +1,53 @@ +diff --git a/patches/panic_unwind_tests.patch b/patches/panic_unwind_tests.patch +index adf431e..e69de29 100644 +--- a/patches/panic_unwind_tests.patch ++++ b/patches/panic_unwind_tests.patch +@@ -1,24 +0,0 @@ +-diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs +-index 9aa3484..53ee7d8 100644 +---- a/tests/sync_broadcast.rs +-+++ b/tests/sync_broadcast.rs +-@@ -292,6 +292,7 @@ fn capacity_too_big() { +- +- #[test] +-+#[cfg(panic = "unwind")] +- #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding +- fn panic_in_clone() { +- use std::panic::{self, AssertUnwindSafe}; +- +-diff --git a/tests/sync_watch.rs b/tests/sync_watch.rs +-index 34f9b78..e8eacce 100644 +---- a/tests/sync_watch.rs +-+++ b/tests/sync_watch.rs +-@@ -214,6 +214,7 @@ fn reopened_after_subscribe() { +- +- #[test] +-+#[cfg(panic = "unwind")] +- #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding +- fn send_modify_panic() { +- let (tx, mut rx) = watch::channel("one"); +- diff --git a/tests/sync_broadcast.rs b/tests/sync_broadcast.rs -index 9aa3484..53ee7d8 100644 +index 67c378b..cd66924 100644 --- a/tests/sync_broadcast.rs +++ b/tests/sync_broadcast.rs -@@ -292,6 +292,7 @@ fn capacity_too_big() { - +@@ -291,6 +291,7 @@ fn capacity_too_big() { + } + #[test] +#[cfg(panic = "unwind")] #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding fn panic_in_clone() { use std::panic::{self, AssertUnwindSafe}; - diff --git a/tests/sync_watch.rs b/tests/sync_watch.rs -index 34f9b78..e8eacce 100644 +index 34f9b78..d4f8ce8 100644 --- a/tests/sync_watch.rs +++ b/tests/sync_watch.rs -@@ -214,6 +214,7 @@ fn reopened_after_subscribe() { - +@@ -213,6 +213,7 @@ fn reopened_after_subscribe() { + } + #[test] +#[cfg(panic = "unwind")] #[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding fn send_modify_panic() { let (tx, mut rx) = watch::channel("one"); - diff --git a/src/fs/file/tests.rs b/src/fs/file/tests.rs index 1c90a8d..7c61b3c 100644 --- a/src/fs/file/tests.rs +++ b/src/fs/file/tests.rs @@ -231,12 +231,12 @@ fn flush_while_idle() { #[cfg_attr(miri, ignore)] // takes a really long time with miri fn read_with_buffer_larger_than_max() { // Chunks - let chunk_a = 16 * 1024; + let chunk_a = crate::io::blocking::MAX_BUF; let chunk_b = chunk_a * 2; let chunk_c = chunk_a * 3; let chunk_d = chunk_a * 4; - assert_eq!(chunk_d / 1024, 64); + assert_eq!(chunk_d / 1024 / 1024, 8); let mut data = vec![]; for i in 0..(chunk_d - 1) { @@ -303,12 +303,12 @@ fn read_with_buffer_larger_than_max() { #[cfg_attr(miri, ignore)] // takes a really long time with miri fn write_with_buffer_larger_than_max() { // Chunks - let chunk_a = 16 * 1024; + let chunk_a = crate::io::blocking::MAX_BUF; let chunk_b = chunk_a * 2; let chunk_c = chunk_a * 3; let chunk_d = chunk_a * 4; - assert_eq!(chunk_d / 1024, 64); + assert_eq!(chunk_d / 1024 / 1024, 8); let mut data = vec![]; for i in 0..(chunk_d - 1) { diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs index 10ad150..9471e8c 100644 --- a/src/fs/read_dir.rs +++ b/src/fs/read_dir.rs @@ -1,5 +1,6 @@ use crate::fs::asyncify; +use std::collections::VecDeque; use std::ffi::OsString; use std::fs::{FileType, Metadata}; use std::future::Future; @@ -19,6 +20,8 @@ use crate::blocking::spawn_blocking; #[cfg(not(test))] use crate::blocking::JoinHandle; +const CHUNK_SIZE: usize = 32; + /// Returns a stream over the entries within a directory. /// /// This is an async version of [`std::fs::read_dir`](std::fs::read_dir) @@ -29,9 +32,14 @@ use crate::blocking::JoinHandle; /// [`spawn_blocking`]: crate::task::spawn_blocking pub async fn read_dir(path: impl AsRef) -> io::Result { let path = path.as_ref().to_owned(); - let std = asyncify(|| std::fs::read_dir(path)).await?; + asyncify(|| -> io::Result { + let mut std = std::fs::read_dir(path)?; + let mut buf = VecDeque::with_capacity(CHUNK_SIZE); + ReadDir::next_chunk(&mut buf, &mut std); - Ok(ReadDir(State::Idle(Some(std)))) + Ok(ReadDir(State::Idle(Some((buf, std))))) + }) + .await } /// Reads the entries in a directory. @@ -58,8 +66,8 @@ pub struct ReadDir(State); #[derive(Debug)] enum State { - Idle(Option), - Pending(JoinHandle<(Option>, std::fs::ReadDir)>), + Idle(Option<(VecDeque>, std::fs::ReadDir)>), + Pending(JoinHandle<(VecDeque>, std::fs::ReadDir)>), } impl ReadDir { @@ -94,29 +102,57 @@ impl ReadDir { pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match self.0 { - State::Idle(ref mut std) => { - let mut std = std.take().unwrap(); + State::Idle(ref mut data) => { + let (buf, _) = data.as_mut().unwrap(); + + if let Some(ent) = buf.pop_front() { + return Poll::Ready(ent.map(Some)); + }; + + let (mut buf, mut std) = data.take().unwrap(); self.0 = State::Pending(spawn_blocking(move || { - let ret = std.next(); - (ret, std) + ReadDir::next_chunk(&mut buf, &mut std); + (buf, std) })); } State::Pending(ref mut rx) => { - let (ret, std) = ready!(Pin::new(rx).poll(cx))?; - self.0 = State::Idle(Some(std)); + let (mut buf, std) = ready!(Pin::new(rx).poll(cx))?; - let ret = match ret { - Some(Ok(std)) => Ok(Some(DirEntry(Arc::new(std)))), + let ret = match buf.pop_front() { + Some(Ok(x)) => Ok(Some(x)), Some(Err(e)) => Err(e), None => Ok(None), }; + self.0 = State::Idle(Some((buf, std))); + return Poll::Ready(ret); } } } } + + fn next_chunk(buf: &mut VecDeque>, std: &mut std::fs::ReadDir) { + for ret in std.by_ref().take(CHUNK_SIZE) { + let success = ret.is_ok(); + + buf.push_back(ret.map(|std| DirEntry { + #[cfg(not(any( + target_os = "solaris", + target_os = "illumos", + target_os = "haiku", + target_os = "vxworks" + )))] + file_type: std.file_type().ok(), + std: Arc::new(std), + })); + + if !success { + break; + } + } + } } feature! { @@ -160,7 +196,16 @@ feature! { /// filesystem. Each entry can be inspected via methods to learn about the full /// path or possibly other metadata through per-platform extension traits. #[derive(Debug)] -pub struct DirEntry(Arc); +pub struct DirEntry { + #[cfg(not(any( + target_os = "solaris", + target_os = "illumos", + target_os = "haiku", + target_os = "vxworks" + )))] + file_type: Option, + std: Arc, +} impl DirEntry { /// Returns the full path to the file that this entry represents. @@ -193,7 +238,7 @@ impl DirEntry { /// /// The exact text, of course, depends on what files you have in `.`. pub fn path(&self) -> PathBuf { - self.0.path() + self.std.path() } /// Returns the bare file name of this directory entry without any other @@ -214,7 +259,7 @@ impl DirEntry { /// # } /// ``` pub fn file_name(&self) -> OsString { - self.0.file_name() + self.std.file_name() } /// Returns the metadata for the file that this entry points at. @@ -248,7 +293,7 @@ impl DirEntry { /// # } /// ``` pub async fn metadata(&self) -> io::Result { - let std = self.0.clone(); + let std = self.std.clone(); asyncify(move || std.metadata()).await } @@ -283,13 +328,23 @@ impl DirEntry { /// # } /// ``` pub async fn file_type(&self) -> io::Result { - let std = self.0.clone(); + #[cfg(not(any( + target_os = "solaris", + target_os = "illumos", + target_os = "haiku", + target_os = "vxworks" + )))] + if let Some(file_type) = self.file_type { + return Ok(file_type); + } + + let std = self.std.clone(); asyncify(move || std.file_type()).await } /// Returns a reference to the underlying `std::fs::DirEntry`. #[cfg(unix)] pub(super) fn as_inner(&self) -> &std::fs::DirEntry { - &self.0 + &self.std } } diff --git a/src/io/blocking.rs b/src/io/blocking.rs index f6db450..416573e 100644 --- a/src/io/blocking.rs +++ b/src/io/blocking.rs @@ -26,7 +26,7 @@ pub(crate) struct Buf { pos: usize, } -pub(crate) const MAX_BUF: usize = 16 * 1024; +pub(crate) const MAX_BUF: usize = 2 * 1024 * 1024; #[derive(Debug)] enum State { diff --git a/src/io/split.rs b/src/io/split.rs index 2e0da95..f067b65 100644 --- a/src/io/split.rs +++ b/src/io/split.rs @@ -75,7 +75,10 @@ impl ReadHalf { /// This can be checked ahead of time by comparing the stream ID /// of the two halves. #[track_caller] - pub fn unsplit(self, wr: WriteHalf) -> T { + pub fn unsplit(self, wr: WriteHalf) -> T + where + T: Unpin, + { if self.is_pair_of(&wr) { drop(wr); diff --git a/src/io/stdio_common.rs b/src/io/stdio_common.rs index 2715ba7..b1cc61d 100644 --- a/src/io/stdio_common.rs +++ b/src/io/stdio_common.rs @@ -108,14 +108,13 @@ where #[cfg(test)] #[cfg(not(loom))] mod tests { + use crate::io::blocking::MAX_BUF; use crate::io::AsyncWriteExt; use std::io; use std::pin::Pin; use std::task::Context; use std::task::Poll; - const MAX_BUF: usize = 16 * 1024; - struct TextMockWriter; impl crate::io::AsyncWrite for TextMockWriter { diff --git a/src/io/util/read.rs b/src/io/util/read.rs index edc9d5a..a1f9c8a 100644 --- a/src/io/util/read.rs +++ b/src/io/util/read.rs @@ -48,7 +48,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let me = self.project(); - let mut buf = ReadBuf::new(*me.buf); + let mut buf = ReadBuf::new(me.buf); ready!(Pin::new(me.reader).poll_read(cx, &mut buf))?; Poll::Ready(Ok(buf.filled().len())) } diff --git a/src/lib.rs b/src/lib.rs index e745fe9..05767d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -174,12 +174,15 @@ //! swapping the currently running task on each thread. However, this kind of //! swapping can only happen at `.await` points, so code that spends a long time //! without reaching an `.await` will prevent other tasks from running. To -//! combat this, Tokio provides two kinds of threads: Core threads and blocking -//! threads. The core threads are where all asynchronous code runs, and Tokio -//! will by default spawn one for each CPU core. The blocking threads are -//! spawned on demand, can be used to run blocking code that would otherwise -//! block other tasks from running and are kept alive when not used for a certain -//! amount of time which can be configured with [`thread_keep_alive`]. +//! combat this, Tokio provides two kinds of threads: Core threads and blocking threads. +//! +//! The core threads are where all asynchronous code runs, and Tokio will by default +//! spawn one for each CPU core. You can use the environment variable `TOKIO_WORKER_THREADS` +//! to override the default value. +//! +//! The blocking threads are spawned on demand, can be used to run blocking code +//! that would otherwise block other tasks from running and are kept alive when +//! not used for a certain amount of time which can be configured with [`thread_keep_alive`]. //! Since it is not possible for Tokio to swap out blocking tasks, like it //! can do with asynchronous code, the upper limit on the number of blocking //! threads is very large. These limits can be configured on the [`Builder`]. diff --git a/src/loom/mocked.rs b/src/loom/mocked.rs index 1c4a32d..56dc1a0 100644 --- a/src/loom/mocked.rs +++ b/src/loom/mocked.rs @@ -25,6 +25,13 @@ pub(crate) mod sync { } } pub(crate) use loom::sync::*; + + pub(crate) mod atomic { + pub(crate) use loom::sync::atomic::*; + + // TODO: implement a loom version + pub(crate) type StaticAtomicU64 = std::sync::atomic::AtomicU64; + } } pub(crate) mod rand { diff --git a/src/loom/std/atomic_u64.rs b/src/loom/std/atomic_u64.rs index 5d1d8a8..ce391be 100644 --- a/src/loom/std/atomic_u64.rs +++ b/src/loom/std/atomic_u64.rs @@ -7,12 +7,13 @@ // `#[cfg(target_has_atomic = "64")]`. // Refs: https://github.com/rust-lang/rust/tree/master/src/librustc_target cfg_has_atomic_u64! { - pub(crate) use std::sync::atomic::AtomicU64; + #[path = "atomic_u64_native.rs"] + mod imp; } cfg_not_has_atomic_u64! { #[path = "atomic_u64_as_mutex.rs"] - mod atomic_u64_as_mutex; - - pub(crate) use atomic_u64_as_mutex::AtomicU64; + mod imp; } + +pub(crate) use imp::{AtomicU64, StaticAtomicU64}; diff --git a/src/loom/std/atomic_u64_as_mutex.rs b/src/loom/std/atomic_u64_as_mutex.rs index 84ddff0..9b3b6fa 100644 --- a/src/loom/std/atomic_u64_as_mutex.rs +++ b/src/loom/std/atomic_u64_as_mutex.rs @@ -1,18 +1,24 @@ use crate::loom::sync::Mutex; use std::sync::atomic::Ordering; +cfg_has_const_mutex_new! { + #[path = "atomic_u64_static_const_new.rs"] + mod static_macro; +} + +cfg_not_has_const_mutex_new! { + #[path = "atomic_u64_static_once_cell.rs"] + mod static_macro; +} + +pub(crate) use static_macro::StaticAtomicU64; + #[derive(Debug)] pub(crate) struct AtomicU64 { inner: Mutex, } impl AtomicU64 { - pub(crate) fn new(val: u64) -> Self { - Self { - inner: Mutex::new(val), - } - } - pub(crate) fn load(&self, _: Ordering) -> u64 { *self.inner.lock() } diff --git a/src/loom/std/atomic_u64_native.rs b/src/loom/std/atomic_u64_native.rs new file mode 100644 index 0000000..08adb28 --- /dev/null +++ b/src/loom/std/atomic_u64_native.rs @@ -0,0 +1,4 @@ +pub(crate) use std::sync::atomic::{AtomicU64, Ordering}; + +/// Alias `AtomicU64` to `StaticAtomicU64` +pub(crate) type StaticAtomicU64 = AtomicU64; diff --git a/src/loom/std/atomic_u64_static_const_new.rs b/src/loom/std/atomic_u64_static_const_new.rs new file mode 100644 index 0000000..a421534 --- /dev/null +++ b/src/loom/std/atomic_u64_static_const_new.rs @@ -0,0 +1,12 @@ +use super::AtomicU64; +use crate::loom::sync::Mutex; + +pub(crate) type StaticAtomicU64 = AtomicU64; + +impl AtomicU64 { + pub(crate) const fn new(val: u64) -> Self { + Self { + inner: Mutex::const_new(val), + } + } +} diff --git a/src/loom/std/atomic_u64_static_once_cell.rs b/src/loom/std/atomic_u64_static_once_cell.rs new file mode 100644 index 0000000..40c6172 --- /dev/null +++ b/src/loom/std/atomic_u64_static_once_cell.rs @@ -0,0 +1,57 @@ +use super::AtomicU64; +use crate::loom::sync::{atomic::Ordering, Mutex}; +use crate::util::once_cell::OnceCell; + +pub(crate) struct StaticAtomicU64 { + init: u64, + cell: OnceCell>, +} + +impl AtomicU64 { + pub(crate) fn new(val: u64) -> Self { + Self { + inner: Mutex::new(val), + } + } +} + +impl StaticAtomicU64 { + pub(crate) const fn new(val: u64) -> StaticAtomicU64 { + StaticAtomicU64 { + init: val, + cell: OnceCell::new(), + } + } + + pub(crate) fn load(&self, order: Ordering) -> u64 { + *self.inner().lock() + } + + pub(crate) fn fetch_add(&self, val: u64, order: Ordering) -> u64 { + let mut lock = self.inner().lock(); + let prev = *lock; + *lock = prev + val; + prev + } + + pub(crate) fn compare_exchange_weak( + &self, + current: u64, + new: u64, + _success: Ordering, + _failure: Ordering, + ) -> Result { + let mut lock = self.inner().lock(); + + if *lock == current { + *lock = new; + Ok(current) + } else { + Err(*lock) + } + } + + fn inner(&self) -> &Mutex { + self.cell.get(|| Mutex::new(self.init)) + } +} diff --git a/src/loom/std/mod.rs b/src/loom/std/mod.rs index 1fc0032..6bd1ad9 100644 --- a/src/loom/std/mod.rs +++ b/src/loom/std/mod.rs @@ -71,7 +71,7 @@ pub(crate) mod sync { pub(crate) mod atomic { pub(crate) use crate::loom::std::atomic_u16::AtomicU16; pub(crate) use crate::loom::std::atomic_u32::AtomicU32; - pub(crate) use crate::loom::std::atomic_u64::AtomicU64; + pub(crate) use crate::loom::std::atomic_u64::{AtomicU64, StaticAtomicU64}; pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; pub(crate) use std::sync::atomic::{fence, AtomicBool, AtomicPtr, AtomicU8, Ordering}; @@ -81,7 +81,27 @@ pub(crate) mod sync { pub(crate) mod sys { #[cfg(feature = "rt-multi-thread")] pub(crate) fn num_cpus() -> usize { - usize::max(1, num_cpus::get()) + const ENV_WORKER_THREADS: &str = "TOKIO_WORKER_THREADS"; + + match std::env::var(ENV_WORKER_THREADS) { + Ok(s) => { + let n = s.parse().unwrap_or_else(|e| { + panic!( + "\"{}\" must be usize, error: {}, value: {}", + ENV_WORKER_THREADS, e, s + ) + }); + assert!(n > 0, "\"{}\" cannot be set to 0", ENV_WORKER_THREADS); + n + } + Err(std::env::VarError::NotPresent) => usize::max(1, num_cpus::get()), + Err(std::env::VarError::NotUnicode(e)) => { + panic!( + "\"{}\" must be valid unicode, error: {:?}", + ENV_WORKER_THREADS, e + ) + } + } } #[cfg(not(feature = "rt-multi-thread"))] diff --git a/src/macros/cfg.rs b/src/macros/cfg.rs index 2eea344..1c66d24 100644 --- a/src/macros/cfg.rs +++ b/src/macros/cfg.rs @@ -461,14 +461,14 @@ macro_rules! cfg_not_coop { macro_rules! cfg_has_atomic_u64 { ($($item:item)*) => { $( - #[cfg(not(any( - target_arch = "arm", - target_arch = "mips", - target_arch = "powerpc", - target_arch = "riscv32", - tokio_wasm, - tokio_no_atomic_u64, - )))] + #[cfg_attr( + not(tokio_no_target_has_atomic), + cfg(all(target_has_atomic = "64", not(tokio_no_atomic_u64)) + ))] + #[cfg_attr( + tokio_no_target_has_atomic, + cfg(not(tokio_no_atomic_u64)) + )] $item )* } @@ -477,14 +477,14 @@ macro_rules! cfg_has_atomic_u64 { macro_rules! cfg_not_has_atomic_u64 { ($($item:item)*) => { $( - #[cfg(any( - target_arch = "arm", - target_arch = "mips", - target_arch = "powerpc", - target_arch = "riscv32", - tokio_wasm, - tokio_no_atomic_u64, + #[cfg_attr( + not(tokio_no_target_has_atomic), + cfg(any(not(target_has_atomic = "64"), tokio_no_atomic_u64) ))] + #[cfg_attr( + tokio_no_target_has_atomic, + cfg(tokio_no_atomic_u64) + )] $item )* } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 4a022fa..4441313 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -195,15 +195,22 @@ impl TcpListener { /// Creates new `TcpListener` from a `std::net::TcpListener`. /// /// This function is intended to be used to wrap a TCP listener from the - /// standard library in the Tokio equivalent. The conversion assumes nothing - /// about the underlying listener; it is left up to the user to set it in - /// non-blocking mode. + /// standard library in the Tokio equivalent. /// /// This API is typically paired with the `socket2` crate and the `Socket` /// type to build up and customize a listener before it's shipped off to the /// backing event loop. This allows configuration of options like /// `SO_REUSEPORT`, binding to multiple addresses, etc. /// + /// # Notes + /// + /// The caller is responsible for ensuring that the listener is in + /// non-blocking mode. Otherwise all I/O operations on the listener + /// will block the thread, which will cause unexpected behavior. + /// Non-blocking mode can be set using [`set_nonblocking`]. + /// + /// [`set_nonblocking`]: std::net::TcpListener::set_nonblocking + /// /// # Examples /// /// ```rust,no_run diff --git a/src/net/tcp/socket.rs b/src/net/tcp/socket.rs index 9453411..09349fe 100644 --- a/src/net/tcp/socket.rs +++ b/src/net/tcp/socket.rs @@ -670,6 +670,15 @@ impl TcpSocket { /// [`std::net::TcpStream`]: struct@std::net::TcpStream /// [`socket2`]: https://docs.rs/socket2/ /// + /// # Notes + /// + /// The caller is responsible for ensuring that the socket is in + /// non-blocking mode. Otherwise all I/O operations on the socket + /// will block the thread, which will cause unexpected behavior. + /// Non-blocking mode can be set using [`set_nonblocking`]. + /// + /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking + /// /// # Examples /// /// ``` @@ -678,8 +687,8 @@ impl TcpSocket { /// /// #[tokio::main] /// async fn main() -> std::io::Result<()> { - /// /// let socket2_socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; + /// socket2_socket.set_nonblocking(true)?; /// /// let socket = TcpSocket::from_std_stream(socket2_socket.into()); /// diff --git a/src/net/tcp/split_owned.rs b/src/net/tcp/split_owned.rs index b2730e8..53fc5f0 100644 --- a/src/net/tcp/split_owned.rs +++ b/src/net/tcp/split_owned.rs @@ -490,12 +490,12 @@ impl AsyncWrite for OwnedWriteHalf { impl AsRef for OwnedReadHalf { fn as_ref(&self) -> &TcpStream { - &*self.inner + &self.inner } } impl AsRef for OwnedWriteHalf { fn as_ref(&self) -> &TcpStream { - &*self.inner + &self.inner } } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index b7dd337..b17d33f 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -165,9 +165,16 @@ impl TcpStream { /// Creates new `TcpStream` from a `std::net::TcpStream`. /// /// This function is intended to be used to wrap a TCP stream from the - /// standard library in the Tokio equivalent. The conversion assumes nothing - /// about the underlying stream; it is left up to the user to set it in - /// non-blocking mode. + /// standard library in the Tokio equivalent. + /// + /// # Notes + /// + /// The caller is responsible for ensuring that the stream is in + /// non-blocking mode. Otherwise all I/O operations on the stream + /// will block the thread, which will cause unexpected behavior. + /// Non-blocking mode can be set using [`set_nonblocking`]. + /// + /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking /// /// # Examples /// diff --git a/src/net/udp.rs b/src/net/udp.rs index af343f2..213d914 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -179,14 +179,21 @@ impl UdpSocket { /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`. /// /// This function is intended to be used to wrap a UDP socket from the - /// standard library in the Tokio equivalent. The conversion assumes nothing - /// about the underlying socket; it is left up to the user to set it in - /// non-blocking mode. + /// standard library in the Tokio equivalent. /// /// This can be used in conjunction with socket2's `Socket` interface to /// configure a socket before it's handed off, such as setting options like /// `reuse_address` or binding to multiple addresses. /// + /// # Notes + /// + /// The caller is responsible for ensuring that the socket is in + /// non-blocking mode. Otherwise all I/O operations on the socket + /// will block the thread, which will cause unexpected behavior. + /// Non-blocking mode can be set using [`set_nonblocking`]. + /// + /// [`set_nonblocking`]: std::net::UdpSocket::set_nonblocking + /// /// # Panics /// /// This function panics if thread-local runtime is not set. diff --git a/src/net/unix/datagram/socket.rs b/src/net/unix/datagram/socket.rs index 5e1453e..76c6b19 100644 --- a/src/net/unix/datagram/socket.rs +++ b/src/net/unix/datagram/socket.rs @@ -426,9 +426,16 @@ impl UnixDatagram { /// Creates new `UnixDatagram` from a `std::os::unix::net::UnixDatagram`. /// /// This function is intended to be used to wrap a UnixDatagram from the - /// standard library in the Tokio equivalent. The conversion assumes - /// nothing about the underlying datagram; it is left up to the user to set - /// it in non-blocking mode. + /// standard library in the Tokio equivalent. + /// + /// # Notes + /// + /// The caller is responsible for ensuring that the socker is in + /// non-blocking mode. Otherwise all I/O operations on the socket + /// will block the thread, which will cause unexpected behavior. + /// Non-blocking mode can be set using [`set_nonblocking`]. + /// + /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking /// /// # Panics /// @@ -470,21 +477,19 @@ impl UnixDatagram { /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`]. /// /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking - /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode + /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode /// if needed. /// /// # Examples /// /// ```rust,no_run - /// use std::error::Error; - /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box> { - /// let tokio_socket = tokio::net::UnixDatagram::bind("127.0.0.1:0")?; - /// let std_socket = tokio_socket.into_std()?; - /// std_socket.set_nonblocking(false)?; - /// Ok(()) - /// } + /// # use std::error::Error; + /// # async fn dox() -> Result<(), Box> { + /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?; + /// let std_socket = tokio_socket.into_std()?; + /// std_socket.set_nonblocking(false)?; + /// # Ok(()) + /// # } /// ``` /// /// [`tokio::net::UnixDatagram`]: UnixDatagram diff --git a/src/net/unix/listener.rs b/src/net/unix/listener.rs index fbea3e7..9887f73 100644 --- a/src/net/unix/listener.rs +++ b/src/net/unix/listener.rs @@ -73,9 +73,31 @@ impl UnixListener { /// Creates new `UnixListener` from a `std::os::unix::net::UnixListener `. /// /// This function is intended to be used to wrap a UnixListener from the - /// standard library in the Tokio equivalent. The conversion assumes - /// nothing about the underlying listener; it is left up to the user to set - /// it in non-blocking mode. + /// standard library in the Tokio equivalent. + /// + /// # Notes + /// + /// The caller is responsible for ensuring that the listener is in + /// non-blocking mode. Otherwise all I/O operations on the listener + /// will block the thread, which will cause unexpected behavior. + /// Non-blocking mode can be set using [`set_nonblocking`]. + /// + /// [`set_nonblocking`]: std::os::unix::net::UnixListener::set_nonblocking + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixListener; + /// use std::os::unix::net::UnixListener as StdUnixListener; + /// # use std::error::Error; + /// + /// # async fn dox() -> Result<(), Box> { + /// let std_listener = StdUnixListener::bind("/path/to/the/socket")?; + /// std_listener.set_nonblocking(true)?; + /// let listener = UnixListener::from_std(std_listener)?; + /// # Ok(()) + /// # } + /// ``` /// /// # Panics /// @@ -95,20 +117,18 @@ impl UnixListener { /// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`]. /// /// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode - /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed. + /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed. /// /// # Examples /// /// ```rust,no_run - /// use std::error::Error; - /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box> { - /// let tokio_listener = tokio::net::UnixListener::bind("127.0.0.1:0")?; - /// let std_listener = tokio_listener.into_std()?; - /// std_listener.set_nonblocking(false)?; - /// Ok(()) - /// } + /// # use std::error::Error; + /// # async fn dox() -> Result<(), Box> { + /// let tokio_listener = tokio::net::UnixListener::bind("/path/to/the/socket")?; + /// let std_listener = tokio_listener.into_std()?; + /// std_listener.set_nonblocking(false)?; + /// # Ok(()) + /// # } /// ``` /// /// [`tokio::net::UnixListener`]: UnixListener diff --git a/src/net/unix/split_owned.rs b/src/net/unix/split_owned.rs index da41ced..2cb561d 100644 --- a/src/net/unix/split_owned.rs +++ b/src/net/unix/split_owned.rs @@ -398,12 +398,12 @@ impl AsyncWrite for OwnedWriteHalf { impl AsRef for OwnedReadHalf { fn as_ref(&self) -> &UnixStream { - &*self.inner + &self.inner } } impl AsRef for OwnedWriteHalf { fn as_ref(&self) -> &UnixStream { - &*self.inner + &self.inner } } diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index 2d27898..c249bf4 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -709,9 +709,31 @@ impl UnixStream { /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`. /// /// This function is intended to be used to wrap a UnixStream from the - /// standard library in the Tokio equivalent. The conversion assumes - /// nothing about the underlying stream; it is left up to the user to set - /// it in non-blocking mode. + /// standard library in the Tokio equivalent. + /// + /// # Notes + /// + /// The caller is responsible for ensuring that the stream is in + /// non-blocking mode. Otherwise all I/O operations on the stream + /// will block the thread, which will cause unexpected behavior. + /// Non-blocking mode can be set using [`set_nonblocking`]. + /// + /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixStream; + /// use std::os::unix::net::UnixStream as StdUnixStream; + /// # use std::error::Error; + /// + /// # async fn dox() -> Result<(), Box> { + /// let std_stream = StdUnixStream::connect("/path/to/the/socket")?; + /// std_stream.set_nonblocking(true)?; + /// let stream = UnixStream::from_std(std_stream)?; + /// # Ok(()) + /// # } + /// ``` /// /// # Panics /// diff --git a/src/net/windows/named_pipe.rs b/src/net/windows/named_pipe.rs index 692c69d..9ede94e 100644 --- a/src/net/windows/named_pipe.rs +++ b/src/net/windows/named_pipe.rs @@ -1705,11 +1705,10 @@ impl ServerOptions { /// /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self { - self.pipe_mode = match pipe_mode { - PipeMode::Byte => windows_sys::PIPE_TYPE_BYTE, - PipeMode::Message => windows_sys::PIPE_TYPE_MESSAGE, - }; - + let is_msg = matches!(pipe_mode, PipeMode::Message); + // Pipe mode is implemented as a bit flag 0x4. Set is message and unset + // is byte. + bool_flag!(self.pipe_mode, is_msg, windows_sys::PIPE_TYPE_MESSAGE); self } @@ -2554,3 +2553,48 @@ unsafe fn named_pipe_info(handle: RawHandle) -> io::Result { max_instances, }) } + +#[cfg(test)] +mod test { + use self::windows_sys::{PIPE_REJECT_REMOTE_CLIENTS, PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE}; + use super::*; + + #[test] + fn opts_default_pipe_mode() { + let opts = ServerOptions::new(); + assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS); + } + + #[test] + fn opts_unset_reject_remote() { + let mut opts = ServerOptions::new(); + opts.reject_remote_clients(false); + assert_eq!(opts.pipe_mode & PIPE_REJECT_REMOTE_CLIENTS, 0); + } + + #[test] + fn opts_set_pipe_mode_maintains_reject_remote_clients() { + let mut opts = ServerOptions::new(); + opts.pipe_mode(PipeMode::Byte); + assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS); + + opts.reject_remote_clients(false); + opts.pipe_mode(PipeMode::Byte); + assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE); + + opts.reject_remote_clients(true); + opts.pipe_mode(PipeMode::Byte); + assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS); + + opts.reject_remote_clients(false); + opts.pipe_mode(PipeMode::Message); + assert_eq!(opts.pipe_mode, PIPE_TYPE_MESSAGE); + + opts.reject_remote_clients(true); + opts.pipe_mode(PipeMode::Message); + assert_eq!( + opts.pipe_mode, + PIPE_TYPE_MESSAGE | PIPE_REJECT_REMOTE_CLIENTS + ); + } +} diff --git a/src/process/unix/mod.rs b/src/process/unix/mod.rs index 0345083..78c792c 100644 --- a/src/process/unix/mod.rs +++ b/src/process/unix/mod.rs @@ -156,7 +156,7 @@ impl Future for Child { #[derive(Debug)] pub(crate) struct Pipe { - // Actually a pipe and not a File. However, we are reusing `File` to get + // Actually a pipe is not a File. However, we are reusing `File` to get // close on drop. This is a similar trick as `mio`. fd: File, } diff --git a/src/process/unix/orphan.rs b/src/process/unix/orphan.rs index 66572ef..3407196 100644 --- a/src/process/unix/orphan.rs +++ b/src/process/unix/orphan.rs @@ -294,7 +294,7 @@ pub(crate) mod test { #[cfg_attr(miri, ignore)] // Miri does not support epoll. #[test] fn does_not_register_signal_if_queue_empty() { - let (io_driver, io_handle) = IoDriver::new().unwrap(); + let (io_driver, io_handle) = IoDriver::new(1024).unwrap(); let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap(); let handle = signal_driver.handle(); diff --git a/src/runtime/blocking/mod.rs b/src/runtime/blocking/mod.rs index 88bdcfd..c42924b 100644 --- a/src/runtime/blocking/mod.rs +++ b/src/runtime/blocking/mod.rs @@ -17,8 +17,6 @@ cfg_trace! { mod schedule; mod shutdown; mod task; -#[cfg(all(test, not(tokio_wasm)))] -pub(crate) use schedule::NoopSchedule; pub(crate) use task::BlockingTask; use crate::runtime::Builder; diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs index 9c53614..e9f6b66 100644 --- a/src/runtime/blocking/pool.rs +++ b/src/runtime/blocking/pool.rs @@ -2,7 +2,7 @@ use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; -use crate::runtime::blocking::schedule::NoopSchedule; +use crate::runtime::blocking::schedule::BlockingSchedule; use crate::runtime::blocking::{shutdown, BlockingTask}; use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; @@ -120,7 +120,7 @@ struct Shared { } pub(crate) struct Task { - task: task::UnownedTask, + task: task::UnownedTask, mandatory: Mandatory, } @@ -151,7 +151,7 @@ impl From for io::Error { } impl Task { - pub(crate) fn new(task: task::UnownedTask, mandatory: Mandatory) -> Task { + pub(crate) fn new(task: task::UnownedTask, mandatory: Mandatory) -> Task { Task { task, mandatory } } @@ -379,7 +379,8 @@ impl Spawner { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - let (task, handle) = task::unowned(fut, NoopSchedule, id); + let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id); + let spawned = self.spawn_task(Task::new(task, is_mandatory), rt); (handle, spawned) } diff --git a/src/runtime/blocking/schedule.rs b/src/runtime/blocking/schedule.rs index 5425224..edf775b 100644 --- a/src/runtime/blocking/schedule.rs +++ b/src/runtime/blocking/schedule.rs @@ -1,15 +1,52 @@ +#[cfg(feature = "test-util")] +use crate::runtime::scheduler; use crate::runtime::task::{self, Task}; +use crate::runtime::Handle; -/// `task::Schedule` implementation that does nothing. This is unique to the -/// blocking scheduler as tasks scheduled are not really futures but blocking -/// operations. +/// `task::Schedule` implementation that does nothing (except some bookkeeping +/// in test-util builds). This is unique to the blocking scheduler as tasks +/// scheduled are not really futures but blocking operations. /// /// We avoid storing the task by forgetting it in `bind` and re-materializing it -/// in `release. -pub(crate) struct NoopSchedule; +/// in `release`. +pub(crate) struct BlockingSchedule { + #[cfg(feature = "test-util")] + handle: Handle, +} + +impl BlockingSchedule { + #[cfg_attr(not(feature = "test-util"), allow(unused_variables))] + pub(crate) fn new(handle: &Handle) -> Self { + #[cfg(feature = "test-util")] + { + match &handle.inner { + scheduler::Handle::CurrentThread(handle) => { + handle.driver.clock.inhibit_auto_advance(); + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(_) => {} + } + } + BlockingSchedule { + #[cfg(feature = "test-util")] + handle: handle.clone(), + } + } +} -impl task::Schedule for NoopSchedule { +impl task::Schedule for BlockingSchedule { fn release(&self, _task: &Task) -> Option> { + #[cfg(feature = "test-util")] + { + match &self.handle.inner { + scheduler::Handle::CurrentThread(handle) => { + handle.driver.clock.allow_auto_advance(); + handle.driver.unpark(); + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(_) => {} + } + } None } diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs index d49a4da..ea0df2e 100644 --- a/src/runtime/builder.rs +++ b/src/runtime/builder.rs @@ -44,6 +44,7 @@ pub struct Builder { /// Whether or not to enable the I/O driver enable_io: bool, + nevents: usize, /// Whether or not to enable the time driver enable_time: bool, @@ -181,6 +182,7 @@ cfg_unstable! { pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; +#[derive(Clone, Copy)] pub(crate) enum Kind { CurrentThread, #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] @@ -228,6 +230,7 @@ impl Builder { // I/O defaults to "off" enable_io: false, + nevents: 1024, // Time defaults to "off" enable_time: false, @@ -235,6 +238,7 @@ impl Builder { // The clock starts not-paused start_paused: false, + // Read from environment variable first in multi-threaded mode. // Default to lazy auto-detection (one thread per CPU core) worker_threads: None, @@ -302,6 +306,8 @@ impl Builder { /// This can be any number above 0 though it is advised to keep this value /// on the smaller side. /// + /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`. + /// /// # Default /// /// The default value is the number of cores available to the system. @@ -647,6 +653,7 @@ impl Builder { enable_io: self.enable_io, enable_time: self.enable_time, start_paused: self.start_paused, + nevents: self.nevents, } } @@ -938,6 +945,25 @@ cfg_io_driver! { self.enable_io = true; self } + + /// Enables the I/O driver and configures the max number of events to be + /// processed per tick. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_current_thread() + /// .enable_io() + /// .max_io_events_per_tick(1024) + /// .build() + /// .unwrap(); + /// ``` + pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self { + self.nevents = capacity; + self + } } } diff --git a/src/runtime/context.rs b/src/runtime/context.rs index 2e54c8b..fef53ca 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -15,6 +15,10 @@ cfg_rt! { } struct Context { + /// Uniquely identifies the current thread + #[cfg(feature = "rt")] + thread_id: Cell>, + /// Handle to the runtime scheduler running on the current thread. #[cfg(feature = "rt")] handle: RefCell>, @@ -46,6 +50,9 @@ struct Context { tokio_thread_local! { static CONTEXT: Context = { Context { + #[cfg(feature = "rt")] + thread_id: Cell::new(None), + /// Tracks the current runtime handle to use when spawning, /// accessing drivers, etc... #[cfg(feature = "rt")] @@ -82,10 +89,23 @@ pub(super) fn budget(f: impl FnOnce(&Cell) -> R) -> Result Result { + CONTEXT.try_with(|ctx| { + match ctx.thread_id.get() { + Some(id) => id, + None => { + let id = ThreadId::next(); + ctx.thread_id.set(Some(id)); + id + } + } + }) + } + #[derive(Debug, Clone, Copy)] #[must_use] pub(crate) enum EnterRuntime { diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs index 8f9c512..4fb6b87 100644 --- a/src/runtime/driver.rs +++ b/src/runtime/driver.rs @@ -36,11 +36,12 @@ pub(crate) struct Cfg { pub(crate) enable_time: bool, pub(crate) enable_pause_time: bool, pub(crate) start_paused: bool, + pub(crate) nevents: usize, } impl Driver { pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> { - let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?; + let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?; let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); @@ -135,12 +136,12 @@ cfg_io_driver! { Disabled(UnparkThread), } - fn create_io_stack(enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { + fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> { #[cfg(loom)] assert!(!enabled); let ret = if enabled { - let (io_driver, io_handle) = crate::runtime::io::Driver::new()?; + let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?; let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?; let process_driver = create_process_driver(signal_driver); @@ -201,7 +202,7 @@ cfg_not_io_driver! { #[derive(Debug)] pub(crate) struct IoStack(ParkThread); - fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { + fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> { let park_thread = ParkThread::new(); let unpark_thread = park_thread.unpark(); Ok((IoStack(park_thread), unpark_thread, Default::default())) diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs index da47ecb..c5dc65f 100644 --- a/src/runtime/handle.rs +++ b/src/runtime/handle.rs @@ -118,9 +118,9 @@ impl Handle { /// thread pool. The thread pool is then responsible for polling the future /// until it completes. /// - /// You do not have to `.await` the returned `JoinHandle` to make the - /// provided future start execution. It will start running in the background - /// immediately when `spawn` is called. + /// The provided future will start running in the background immediately + /// when `spawn` is called, even if you don't await the returned + /// `JoinHandle`. /// /// See [module level][mod] documentation for more details. /// diff --git a/src/runtime/io/mod.rs b/src/runtime/io/mod.rs index 02039f2..2e578b6 100644 --- a/src/runtime/io/mod.rs +++ b/src/runtime/io/mod.rs @@ -60,6 +60,7 @@ pub(crate) struct Handle { pub(crate) struct ReadyEvent { tick: u8, pub(crate) ready: Ready, + is_shutdown: bool, } struct IoDispatcher { @@ -104,7 +105,7 @@ fn _assert_kinds() { impl Driver { /// Creates a new event loop, returning any error that happened during the /// creation. - pub(crate) fn new() -> io::Result<(Driver, Handle)> { + pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> { let poll = mio::Poll::new()?; #[cfg(not(tokio_wasi))] let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; @@ -116,7 +117,7 @@ impl Driver { let driver = Driver { tick: 0, signal_ready: false, - events: mio::Events::with_capacity(1024), + events: mio::Events::with_capacity(nevents), poll, resources: slab, }; @@ -147,9 +148,8 @@ impl Driver { if handle.shutdown() { self.resources.for_each(|io| { - // If a task is waiting on the I/O resource, notify it. The task - // will then attempt to use the I/O resource and fail due to the - // driver being shutdown. And shutdown will clear all wakers. + // If a task is waiting on the I/O resource, notify it that the + // runtime is being shutdown. And shutdown will clear all wakers. io.shutdown(); }); } @@ -282,16 +282,12 @@ impl Handle { true } - fn is_shutdown(&self) -> bool { - return self.io_dispatch.read().unwrap().is_shutdown; - } - fn allocate(&self) -> io::Result<(slab::Address, slab::Ref)> { let io = self.io_dispatch.read().unwrap(); if io.is_shutdown { return Err(io::Error::new( io::ErrorKind::Other, - "failed to find event loop", + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, )); } io.allocator.allocate().ok_or_else(|| { diff --git a/src/runtime/io/registration.rs b/src/runtime/io/registration.rs index 7b95f7f..140b924 100644 --- a/src/runtime/io/registration.rs +++ b/src/runtime/io/registration.rs @@ -148,7 +148,7 @@ impl Registration { let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); - if self.handle().is_shutdown() { + if ev.is_shutdown { return Poll::Ready(Err(gone())); } @@ -217,28 +217,22 @@ impl Drop for Registration { } fn gone() -> io::Error { - io::Error::new(io::ErrorKind::Other, "IO driver has terminated") + io::Error::new( + io::ErrorKind::Other, + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, + ) } cfg_io_readiness! { impl Registration { pub(crate) async fn readiness(&self, interest: Interest) -> io::Result { - use std::future::Future; - use std::pin::Pin; + let ev = self.shared.readiness(interest).await; - let fut = self.shared.readiness(interest); - pin!(fut); - - crate::future::poll_fn(|cx| { - if self.handle().is_shutdown() { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR - ))); - } + if ev.is_shutdown { + return Err(gone()) + } - Pin::new(&mut fut).poll(cx).map(Ok) - }).await + Ok(ev) } pub(crate) async fn async_io(&self, interest: Interest, mut f: impl FnMut() -> io::Result) -> io::Result { diff --git a/src/runtime/io/scheduled_io.rs b/src/runtime/io/scheduled_io.rs index 1709091..197a4e0 100644 --- a/src/runtime/io/scheduled_io.rs +++ b/src/runtime/io/scheduled_io.rs @@ -46,9 +46,6 @@ struct Waiters { /// Waker used for AsyncWrite. writer: Option, - - /// True if this ScheduledIo has been killed due to IO driver shutdown. - is_shutdown: bool, } cfg_io_readiness! { @@ -95,7 +92,7 @@ cfg_io_readiness! { // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. // -// | reserved | generation | driver tick | readiness | +// | shutdown | generation | driver tick | readiness | // |----------+------------+--------------+-----------| // | 1 bit | 7 bits + 8 bits + 16 bits | @@ -105,6 +102,8 @@ const TICK: bit::Pack = READINESS.then(8); const GENERATION: bit::Pack = TICK.then(7); +const SHUTDOWN: bit::Pack = GENERATION.then(1); + #[test] fn test_generations_assert_same() { assert_eq!(super::GENERATION, GENERATION); @@ -138,9 +137,11 @@ impl ScheduledIo { } /// Invoked when the IO driver is shut down; forces this ScheduledIo into a - /// permanently ready state. + /// permanently shutdown state. pub(super) fn shutdown(&self) { - self.wake0(Ready::ALL, true) + let mask = SHUTDOWN.pack(1, 0); + self.readiness.fetch_or(mask, AcqRel); + self.wake(Ready::ALL); } /// Sets the readiness on this `ScheduledIo` by invoking the given closure on @@ -219,16 +220,10 @@ impl ScheduledIo { /// than 32 wakers to notify, if the stack array fills up, the lock is /// released, the array is cleared, and the iteration continues. pub(super) fn wake(&self, ready: Ready) { - self.wake0(ready, false); - } - - fn wake0(&self, ready: Ready, shutdown: bool) { let mut wakers = WakeList::new(); let mut waiters = self.waiters.lock(); - waiters.is_shutdown |= shutdown; - // check for AsyncRead slot if ready.is_readable() { if let Some(waker) = waiters.reader.take() { @@ -283,6 +278,7 @@ impl ScheduledIo { ReadyEvent { tick: TICK.unpack(curr) as u8, ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), + is_shutdown: SHUTDOWN.unpack(curr) != 0, } } @@ -299,8 +295,9 @@ impl ScheduledIo { let curr = self.readiness.load(Acquire); let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; - if ready.is_empty() { + if ready.is_empty() && !is_shutdown { // Update the task info let mut waiters = self.waiters.lock(); let slot = match direction { @@ -325,10 +322,12 @@ impl ScheduledIo { // taking the waiters lock let curr = self.readiness.load(Acquire); let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); - if waiters.is_shutdown { + let is_shutdown = SHUTDOWN.unpack(curr) != 0; + if is_shutdown { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, ready: direction.mask(), + is_shutdown, }) } else if ready.is_empty() { Poll::Pending @@ -336,12 +335,14 @@ impl ScheduledIo { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, ready, + is_shutdown, }) } } else { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, ready, + is_shutdown, }) } } @@ -433,16 +434,17 @@ cfg_io_readiness! { // Optimistically check existing readiness let curr = scheduled_io.readiness.load(SeqCst); let ready = Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; // Safety: `waiter.interest` never changes let interest = unsafe { (*waiter.get()).interest }; let ready = ready.intersection(interest); - if !ready.is_empty() { + if !ready.is_empty() || is_shutdown { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); + return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); } // Wasn't ready, take the lock (and check again while locked). @@ -450,18 +452,19 @@ cfg_io_readiness! { let curr = scheduled_io.readiness.load(SeqCst); let mut ready = Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; - if waiters.is_shutdown { + if is_shutdown { ready = Ready::ALL; } let ready = ready.intersection(interest); - if !ready.is_empty() { + if !ready.is_empty() || is_shutdown { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); + return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); } // Not ready even after locked, insert into list... @@ -514,6 +517,7 @@ cfg_io_readiness! { let w = unsafe { &mut *waiter.get() }; let curr = scheduled_io.readiness.load(Acquire); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; // The returned tick might be newer than the event // which notified our waker. This is ok because the future @@ -528,6 +532,7 @@ cfg_io_readiness! { return Poll::Ready(ReadyEvent { tick, ready, + is_shutdown, }); } } diff --git a/src/runtime/metrics/batch.rs b/src/runtime/metrics/batch.rs index f1c3fa6..4e6b28d 100644 --- a/src/runtime/metrics/batch.rs +++ b/src/runtime/metrics/batch.rs @@ -11,9 +11,12 @@ pub(crate) struct MetricsBatch { /// Number of times the worker woke w/o doing work. noop_count: u64, - /// Number of times stolen. + /// Number of tasks stolen. steal_count: u64, + /// Number of times tasks where stolen. + steal_operations: u64, + /// Number of tasks that were polled by the worker. poll_count: u64, @@ -39,6 +42,7 @@ impl MetricsBatch { park_count: 0, noop_count: 0, steal_count: 0, + steal_operations: 0, poll_count: 0, poll_count_on_last_park: 0, local_schedule_count: 0, @@ -52,6 +56,9 @@ impl MetricsBatch { worker.park_count.store(self.park_count, Relaxed); worker.noop_count.store(self.noop_count, Relaxed); worker.steal_count.store(self.steal_count, Relaxed); + worker + .steal_operations + .store(self.steal_operations, Relaxed); worker.poll_count.store(self.poll_count, Relaxed); worker @@ -98,6 +105,10 @@ cfg_rt_multi_thread! { self.steal_count += by as u64; } + pub(crate) fn incr_steal_operations(&mut self) { + self.steal_operations += 1; + } + pub(crate) fn incr_overflow_count(&mut self) { self.overflow_count += 1; } diff --git a/src/runtime/metrics/mock.rs b/src/runtime/metrics/mock.rs index 6b9cf70..c388dc0 100644 --- a/src/runtime/metrics/mock.rs +++ b/src/runtime/metrics/mock.rs @@ -38,6 +38,7 @@ impl MetricsBatch { cfg_rt_multi_thread! { impl MetricsBatch { pub(crate) fn incr_steal_count(&mut self, _by: u16) {} + pub(crate) fn incr_steal_operations(&mut self) {} pub(crate) fn incr_overflow_count(&mut self) {} } } diff --git a/src/runtime/metrics/runtime.rs b/src/runtime/metrics/runtime.rs index dee14a4..d29cb3d 100644 --- a/src/runtime/metrics/runtime.rs +++ b/src/runtime/metrics/runtime.rs @@ -210,10 +210,57 @@ impl RuntimeMetrics { .load(Relaxed) } + /// Returns the number of tasks the given worker thread stole from + /// another worker thread. + /// + /// This metric only applies to the **multi-threaded** runtime and will + /// always return `0` when using the current thread runtime. + /// + /// The worker steal count starts at zero when the runtime is created and + /// increases by `N` each time the worker has processed its scheduled queue + /// and successfully steals `N` more pending tasks from another worker. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_steal_count(0); + /// println!("worker 0 has stolen {} tasks", n); + /// } + /// ``` + pub fn worker_steal_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .steal_count + .load(Relaxed) + } + /// Returns the number of times the given worker thread stole tasks from /// another worker thread. /// - /// This metric only applies to the **multi-threaded** runtime and will always return `0` when using the current thread runtime. + /// This metric only applies to the **multi-threaded** runtime and will + /// always return `0` when using the current thread runtime. /// /// The worker steal count starts at zero when the runtime is created and /// increases by one each time the worker has processed its scheduled queue @@ -243,15 +290,15 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_noop_count(0); + /// let n = metrics.worker_steal_operations(0); /// println!("worker 0 has stolen tasks {} times", n); /// } /// ``` - pub fn worker_steal_count(&self, worker: usize) -> u64 { + pub fn worker_steal_operations(&self, worker: usize) -> u64 { self.handle .inner .worker_metrics(worker) - .steal_count + .steal_operations .load(Relaxed) } @@ -328,8 +375,8 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.worker_poll_count(0); - /// println!("worker 0 has polled {} tasks", n); + /// let n = metrics.worker_total_busy_duration(0); + /// println!("worker 0 was busy for a total of {:?}", n); /// } /// ``` pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { diff --git a/src/runtime/metrics/worker.rs b/src/runtime/metrics/worker.rs index ec58de6..a40c76e 100644 --- a/src/runtime/metrics/worker.rs +++ b/src/runtime/metrics/worker.rs @@ -17,9 +17,12 @@ pub(crate) struct WorkerMetrics { /// Number of times the worker woke then parked again without doing work. pub(crate) noop_count: AtomicU64, - /// Number of times the worker attempted to steal. + /// Number of tasks the worker stole. pub(crate) steal_count: AtomicU64, + /// Number of times the worker stole + pub(crate) steal_operations: AtomicU64, + /// Number of tasks the worker polled. pub(crate) poll_count: AtomicU64, @@ -43,6 +46,7 @@ impl WorkerMetrics { park_count: AtomicU64::new(0), noop_count: AtomicU64::new(0), steal_count: AtomicU64::new(0), + steal_operations: AtomicU64::new(0), poll_count: AtomicU64::new(0), overflow_count: AtomicU64::new(0), busy_duration_total: AtomicU64::new(0), diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 45b79b0..b6f43ea 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -237,6 +237,9 @@ cfg_rt! { mod runtime; pub use runtime::{Runtime, RuntimeFlavor}; + mod thread_id; + pub(crate) use thread_id::ThreadId; + cfg_metrics! { mod metrics; pub use metrics::RuntimeMetrics; diff --git a/src/runtime/runtime.rs b/src/runtime/runtime.rs index 9ede0a7..1985673 100644 --- a/src/runtime/runtime.rs +++ b/src/runtime/runtime.rs @@ -163,9 +163,9 @@ impl Runtime { /// thread pool. The thread pool is then responsible for polling the future /// until it completes. /// - /// You do not have to `.await` the returned `JoinHandle` to make the - /// provided future start execution. It will start running in the - /// background immediately when `spawn` is called. + /// The provided future will start running in the background immediately + /// when `spawn` is called, even if you don't await the returned + /// `JoinHandle`. /// /// See [module level][mod] documentation for more details. /// diff --git a/src/runtime/scheduler/multi_thread/queue.rs b/src/runtime/scheduler/multi_thread/queue.rs index 59b448d..faf56db 100644 --- a/src/runtime/scheduler/multi_thread/queue.rs +++ b/src/runtime/scheduler/multi_thread/queue.rs @@ -263,7 +263,7 @@ impl Local { // safety: The CAS above ensures that no consumer will look at these // values again, and we are the only producer. let batch_iter = BatchTaskIter { - buffer: &*self.inner.buffer, + buffer: &self.inner.buffer, head: head as UnsignedLong, i: 0, }; @@ -353,6 +353,7 @@ impl Steal { } dst_metrics.incr_steal_count(n as u16); + dst_metrics.incr_steal_operations(); // We are returning a task here n -= 1; diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs index c079297..8e3c3d1 100644 --- a/src/runtime/task/harness.rs +++ b/src/runtime/task/harness.rs @@ -194,7 +194,7 @@ where TransitionToRunning::Success => { let header_ptr = self.header_ptr(); let waker_ref = waker_ref::(&header_ptr); - let cx = Context::from_waker(&*waker_ref); + let cx = Context::from_waker(&waker_ref); let res = poll_future(self.core(), cx); if res == Poll::Ready(()) { @@ -315,9 +315,10 @@ where // this task. It is our responsibility to drop the // output. self.core().drop_future_or_output(); - } else if snapshot.has_join_waker() { - // Notify the join handle. The previous transition obtains the - // lock on the waker cell. + } else if snapshot.is_join_waker_set() { + // Notify the waker. Reading the waker field is safe per rule 4 + // in task/mod.rs, since the JOIN_WAKER bit is set and the call + // to transition_to_complete() above set the COMPLETE bit. self.trailer().wake_join(); } })); @@ -367,36 +368,30 @@ fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool { debug_assert!(snapshot.is_join_interested()); if !snapshot.is_complete() { - // The waker must be stored in the task struct. - let res = if snapshot.has_join_waker() { - // There already is a waker stored in the struct. If it matches - // the provided waker, then there is no further work to do. - // Otherwise, the waker must be swapped. - let will_wake = unsafe { - // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE` - // may mutate the `waker` field. - trailer.will_wake(waker) - }; - - if will_wake { - // The task is not complete **and** the waker is up to date, - // there is nothing further that needs to be done. + // If the task is not complete, try storing the provided waker in the + // task's waker field. + + let res = if snapshot.is_join_waker_set() { + // If JOIN_WAKER is set, then JoinHandle has previously stored a + // waker in the waker field per step (iii) of rule 5 in task/mod.rs. + + // Optimization: if the stored waker and the provided waker wake the + // same task, then return without touching the waker field. (Reading + // the waker field below is safe per rule 3 in task/mod.rs.) + if unsafe { trailer.will_wake(waker) } { return false; } - // Unset the `JOIN_WAKER` to gain mutable access to the `waker` - // field then update the field with the new join worker. - // - // This requires two atomic operations, unsetting the bit and - // then resetting it. If the task transitions to complete - // concurrently to either one of those operations, then setting - // the join waker fails and we proceed to reading the task - // output. + // Otherwise swap the stored waker with the provided waker by + // following the rule 5 in task/mod.rs. header .state .unset_waker() .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot)) } else { + // If JOIN_WAKER is unset, then JoinHandle has mutable access to the + // waker field per rule 2 in task/mod.rs; therefore, skip step (i) + // of rule 5 and try to store the provided waker in the waker field. set_join_waker(header, trailer, waker.clone(), snapshot) }; @@ -417,7 +412,7 @@ fn set_join_waker( snapshot: Snapshot, ) -> Result { assert!(snapshot.is_join_interested()); - assert!(!snapshot.has_join_waker()); + assert!(!snapshot.is_join_waker_set()); // Safety: Only the `JoinHandle` may set the `waker` field. When // `JOIN_INTEREST` is **not** set, nothing else will touch the field. diff --git a/src/runtime/task/id.rs b/src/runtime/task/id.rs new file mode 100644 index 0000000..2b0d95c --- /dev/null +++ b/src/runtime/task/id.rs @@ -0,0 +1,87 @@ +use crate::runtime::context; + +use std::fmt; + +/// An opaque ID that uniquely identifies a task relative to all other currently +/// running tasks. +/// +/// # Notes +/// +/// - Task IDs are unique relative to other *currently running* tasks. When a +/// task completes, the same ID may be used for another task. +/// - Task IDs are *not* sequential, and do not indicate the order in which +/// tasks are spawned, what runtime a task is spawned on, or any other data. +/// - The task ID of the currently running task can be obtained from inside the +/// task via the [`task::try_id()`](crate::task::try_id()) and +/// [`task::id()`](crate::task::id()) functions and from outside the task via +/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [unstable]: crate#unstable-features +#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] +pub struct Id(u64); + +/// Returns the [`Id`] of the currently running task. +/// +/// # Panics +/// +/// This function panics if called from outside a task. Please note that calls +/// to `block_on` do not have task IDs, so the method will panic if called from +/// within a call to `block_on`. For a version of this function that doesn't +/// panic, see [`task::try_id()`](crate::runtime::task::try_id()). +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [task ID]: crate::task::Id +/// [unstable]: crate#unstable-features +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[track_caller] +pub fn id() -> Id { + context::current_task_id().expect("Can't get a task id when not inside a task") +} + +/// Returns the [`Id`] of the currently running task, or `None` if called outside +/// of a task. +/// +/// This function is similar to [`task::id()`](crate::runtime::task::id()), except +/// that it returns `None` rather than panicking if called outside of a task +/// context. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [task ID]: crate::task::Id +/// [unstable]: crate#unstable-features +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[track_caller] +pub fn try_id() -> Option { + context::current_task_id() +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl Id { + pub(crate) fn next() -> Self { + use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64}; + + static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1); + + Self(NEXT_ID.fetch_add(1, Relaxed)) + } + + pub(crate) fn as_u64(&self) -> u64 { + self.0 + } +} diff --git a/src/runtime/task/join.rs b/src/runtime/task/join.rs index 5660575..11c4b9b 100644 --- a/src/runtime/task/join.rs +++ b/src/runtime/task/join.rs @@ -11,9 +11,9 @@ cfg_rt! { /// An owned permission to join on a task (await its termination). /// /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] - /// for a Tokio task rather than a thread. You do not need to `.await` the - /// `JoinHandle` to make the task execute — it will start running in the - /// background immediately. + /// for a Tokio task rather than a thread. Note that the background task + /// associated with this `JoinHandle` started running immediately when you + /// called spawn, even if you have not yet awaited the `JoinHandle`. /// /// A `JoinHandle` *detaches* the associated task when it is dropped, which /// means that there is no longer any handle to the task, and no way to `join` diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index fea6e0f..55131ac 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -168,19 +168,20 @@ // unstable. This should be removed once `JoinSet` is stabilized. #![cfg_attr(not(tokio_unstable), allow(dead_code))] -use crate::runtime::context; - mod core; use self::core::Cell; use self::core::Header; mod error; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::error::JoinError; mod harness; use self::harness::Harness; +mod id; +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +pub use id::{id, try_id, Id}; + cfg_rt_multi_thread! { mod inject; pub(super) use self::inject::Inject; @@ -191,10 +192,8 @@ mod abort; mod join; #[cfg(feature = "rt")] -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::abort::AbortHandle; -#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::join::JoinHandle; mod list; @@ -215,70 +214,6 @@ use std::marker::PhantomData; use std::ptr::NonNull; use std::{fmt, mem}; -/// An opaque ID that uniquely identifies a task relative to all other currently -/// running tasks. -/// -/// # Notes -/// -/// - Task IDs are unique relative to other *currently running* tasks. When a -/// task completes, the same ID may be used for another task. -/// - Task IDs are *not* sequential, and do not indicate the order in which -/// tasks are spawned, what runtime a task is spawned on, or any other data. -/// - The task ID of the currently running task can be obtained from inside the -/// task via the [`task::try_id()`](crate::task::try_id()) and -/// [`task::id()`](crate::task::id()) functions and from outside the task via -/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function. -/// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. -/// -/// [unstable]: crate#unstable-features -#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] -#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] -#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] -pub struct Id(u64); - -/// Returns the [`Id`] of the currently running task. -/// -/// # Panics -/// -/// This function panics if called from outside a task. Please note that calls -/// to `block_on` do not have task IDs, so the method will panic if called from -/// within a call to `block_on`. For a version of this function that doesn't -/// panic, see [`task::try_id()`](crate::runtime::task::try_id()). -/// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. -/// -/// [task ID]: crate::task::Id -/// [unstable]: crate#unstable-features -#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] -#[track_caller] -pub fn id() -> Id { - context::current_task_id().expect("Can't get a task id when not inside a task") -} - -/// Returns the [`Id`] of the currently running task, or `None` if called outside -/// of a task. -/// -/// This function is similar to [`task::id()`](crate::runtime::task::id()), except -/// that it returns `None` rather than panicking if called outside of a task -/// context. -/// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. -/// -/// [task ID]: crate::task::Id -/// [unstable]: crate#unstable-features -#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] -#[track_caller] -pub fn try_id() -> Option { - context::current_task_id() -} - /// An owned handle to the task, tracked by ref count. #[repr(transparent)] pub(crate) struct Task { @@ -554,66 +489,3 @@ unsafe impl linked_list::Link for Task { self::core::Trailer::addr_of_owned(Header::get_trailer(target)) } } - -impl fmt::Display for Id { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} - -impl Id { - // When 64-bit atomics are available, use a static `AtomicU64` counter to - // generate task IDs. - // - // Note(eliza): we _could_ just use `crate::loom::AtomicU64`, which switches - // between an atomic and mutex-based implementation here, rather than having - // two separate functions for targets with and without 64-bit atomics. - // However, because we can't use the mutex-based implementation in a static - // initializer directly, the 32-bit impl also has to use a `OnceCell`, and I - // thought it was nicer to avoid the `OnceCell` overhead on 64-bit - // platforms... - cfg_has_atomic_u64! { - pub(crate) fn next() -> Self { - use std::sync::atomic::{AtomicU64, Ordering::Relaxed}; - static NEXT_ID: AtomicU64 = AtomicU64::new(1); - Self(NEXT_ID.fetch_add(1, Relaxed)) - } - } - - cfg_not_has_atomic_u64! { - cfg_has_const_mutex_new! { - pub(crate) fn next() -> Self { - use crate::loom::sync::Mutex; - static NEXT_ID: Mutex = Mutex::const_new(1); - - let mut lock = NEXT_ID.lock(); - let id = *lock; - *lock += 1; - Self(id) - } - } - - cfg_not_has_const_mutex_new! { - pub(crate) fn next() -> Self { - use crate::util::once_cell::OnceCell; - use crate::loom::sync::Mutex; - - fn init_next_id() -> Mutex { - Mutex::new(1) - } - - static NEXT_ID: OnceCell> = OnceCell::new(); - - let next_id = NEXT_ID.get(init_next_id); - let mut lock = next_id.lock(); - let id = *lock; - *lock += 1; - Self(id) - } - } - } - - pub(crate) fn as_u64(&self) -> u64 { - self.0 - } -} diff --git a/src/runtime/task/state.rs b/src/runtime/task/state.rs index c2d5b28..7728312 100644 --- a/src/runtime/task/state.rs +++ b/src/runtime/task/state.rs @@ -378,7 +378,7 @@ impl State { pub(super) fn set_join_waker(&self) -> UpdateResult { self.fetch_update(|curr| { assert!(curr.is_join_interested()); - assert!(!curr.has_join_waker()); + assert!(!curr.is_join_waker_set()); if curr.is_complete() { return None; @@ -398,7 +398,7 @@ impl State { pub(super) fn unset_waker(&self) -> UpdateResult { self.fetch_update(|curr| { assert!(curr.is_join_interested()); - assert!(curr.has_join_waker()); + assert!(curr.is_join_waker_set()); if curr.is_complete() { return None; @@ -546,7 +546,7 @@ impl Snapshot { self.0 &= !JOIN_INTEREST } - pub(super) fn has_join_waker(self) -> bool { + pub(super) fn is_join_waker_set(self) -> bool { self.0 & JOIN_WAKER == JOIN_WAKER } @@ -588,7 +588,7 @@ impl fmt::Debug for Snapshot { .field("is_notified", &self.is_notified()) .field("is_cancelled", &self.is_cancelled()) .field("is_join_interested", &self.is_join_interested()) - .field("has_join_waker", &self.has_join_waker()) + .field("is_join_waker_set", &self.is_join_waker_set()) .field("ref_count", &self.ref_count()) .finish() } diff --git a/src/runtime/tests/loom_blocking.rs b/src/runtime/tests/loom_blocking.rs index 89de85e..5c4aeae 100644 --- a/src/runtime/tests/loom_blocking.rs +++ b/src/runtime/tests/loom_blocking.rs @@ -73,6 +73,27 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread }); } +#[test] +fn spawn_blocking_when_paused() { + use std::time::Duration; + loom::model(|| { + let rt = crate::runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + let handle = rt.handle(); + let _enter = handle.enter(); + let a = crate::task::spawn_blocking(|| {}); + let b = crate::task::spawn_blocking(|| {}); + rt.block_on(crate::time::timeout(Duration::from_millis(1), async move { + a.await.expect("blocking task should finish"); + b.await.expect("blocking task should finish"); + })) + .expect("timeout should not trigger"); + }); +} + fn mk_runtime(num_threads: usize) -> Runtime { runtime::Builder::new_multi_thread() .worker_threads(num_threads) diff --git a/src/runtime/tests/loom_queue.rs b/src/runtime/tests/loom_queue.rs index 8d4e1d3..fc93bf3 100644 --- a/src/runtime/tests/loom_queue.rs +++ b/src/runtime/tests/loom_queue.rs @@ -1,6 +1,6 @@ -use crate::runtime::blocking::NoopSchedule; use crate::runtime::scheduler::multi_thread::queue; use crate::runtime::task::Inject; +use crate::runtime::tests::NoopSchedule; use crate::runtime::MetricsBatch; use loom::thread; diff --git a/src/runtime/tests/mod.rs b/src/runtime/tests/mod.rs index 1c67dfe..4e7c245 100644 --- a/src/runtime/tests/mod.rs +++ b/src/runtime/tests/mod.rs @@ -2,11 +2,29 @@ // other code when running loom tests. #![cfg_attr(loom, warn(dead_code, unreachable_pub))] +use self::noop_scheduler::NoopSchedule; use self::unowned_wrapper::unowned; +mod noop_scheduler { + use crate::runtime::task::{self, Task}; + + /// `task::Schedule` implementation that does nothing, for testing. + pub(crate) struct NoopSchedule; + + impl task::Schedule for NoopSchedule { + fn release(&self, _task: &Task) -> Option> { + None + } + + fn schedule(&self, _task: task::Notified) { + unreachable!(); + } + } +} + mod unowned_wrapper { - use crate::runtime::blocking::NoopSchedule; use crate::runtime::task::{Id, JoinHandle, Notified}; + use crate::runtime::tests::NoopSchedule; #[cfg(all(tokio_unstable, feature = "tracing"))] pub(crate) fn unowned(task: T) -> (Notified, JoinHandle) diff --git a/src/runtime/tests/task.rs b/src/runtime/tests/task.rs index 173e5b0..a79c0f5 100644 --- a/src/runtime/tests/task.rs +++ b/src/runtime/tests/task.rs @@ -1,5 +1,5 @@ -use crate::runtime::blocking::NoopSchedule; use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task}; +use crate::runtime::tests::NoopSchedule; use crate::util::TryLock; use std::collections::VecDeque; diff --git a/src/runtime/thread_id.rs b/src/runtime/thread_id.rs new file mode 100644 index 0000000..ef39289 --- /dev/null +++ b/src/runtime/thread_id.rs @@ -0,0 +1,31 @@ +use std::num::NonZeroU64; + +#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)] +pub(crate) struct ThreadId(NonZeroU64); + +impl ThreadId { + pub(crate) fn next() -> Self { + use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64}; + + static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(0); + + let mut last = NEXT_ID.load(Relaxed); + loop { + let id = match last.checked_add(1) { + Some(id) => id, + None => exhausted(), + }; + + match NEXT_ID.compare_exchange_weak(last, id, Relaxed, Relaxed) { + Ok(_) => return ThreadId(NonZeroU64::new(id).unwrap()), + Err(id) => last = id, + } + } + } +} + +#[cold] +#[allow(dead_code)] +fn exhausted() -> ! { + panic!("failed to generate unique thread ID: bitspace exhausted") +} diff --git a/src/runtime/time/mod.rs b/src/runtime/time/mod.rs index 240f8f1..f81cab8 100644 --- a/src/runtime/time/mod.rs +++ b/src/runtime/time/mod.rs @@ -222,7 +222,7 @@ impl Driver { let handle = rt_handle.time(); let clock = &handle.time_source.clock; - if clock.is_paused() { + if clock.can_auto_advance() { self.park.park_timeout(rt_handle, Duration::from_secs(0)); // If the time driver was woken, then the park completed diff --git a/src/signal/registry.rs b/src/signal/registry.rs index e1b3d10..48e98c8 100644 --- a/src/signal/registry.rs +++ b/src/signal/registry.rs @@ -5,7 +5,6 @@ use crate::sync::watch; use crate::util::once_cell::OnceCell; use std::ops; -use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; pub(crate) type EventId = usize; @@ -162,14 +161,14 @@ where } } -pub(crate) fn globals() -> Pin<&'static Globals> +pub(crate) fn globals() -> &'static Globals where OsExtraData: 'static + Send + Sync + Init, OsStorage: 'static + Send + Sync + Init, { static GLOBALS: OnceCell = OnceCell::new(); - Pin::new(GLOBALS.get(globals_init)) + GLOBALS.get(globals_init) } #[cfg(all(test, not(loom)))] diff --git a/src/signal/unix.rs b/src/signal/unix.rs index 0e1329e..e5345fd 100644 --- a/src/signal/unix.rs +++ b/src/signal/unix.rs @@ -14,7 +14,6 @@ use crate::sync::watch; use mio::net::UnixStream; use std::io::{self, Error, ErrorKind, Write}; -use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Once; use std::task::{Context, Poll}; @@ -240,7 +239,7 @@ impl Default for SignalInfo { /// 2. Wake up the driver by writing a byte to a pipe /// /// Those two operations should both be async-signal safe. -fn action(globals: Pin<&'static Globals>, signal: libc::c_int) { +fn action(globals: &'static Globals, signal: libc::c_int) { globals.record_event(signal as EventId); // Send a wakeup, ignore any errors (anything reasonably possible is diff --git a/src/sync/broadcast.rs b/src/sync/broadcast.rs index 452f3b1..1c6b2ca 100644 --- a/src/sync/broadcast.rs +++ b/src/sync/broadcast.rs @@ -18,6 +18,9 @@ //! returned [`Receiver`] will receive values sent **after** the call to //! `subscribe`. //! +//! This channel is also suitable for the single-producer multi-consumer +//! use-case, where a single sender broadcasts values to many receivers. +//! //! ## Lagging //! //! As sent messages must be retained until **all** [`Receiver`] handles receive @@ -600,6 +603,97 @@ impl Sender { new_receiver(shared) } + /// Returns the number of queued values. + /// + /// A value is queued until it has either been seen by all receivers that were alive at the time + /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the + /// queue's capacity. + /// + /// # Note + /// + /// In contrast to [`Receiver::len`], this method only reports queued values and not values that + /// have been evicted from the queue before being seen by all receivers. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel(16); + /// let mut rx2 = tx.subscribe(); + /// + /// tx.send(10).unwrap(); + /// tx.send(20).unwrap(); + /// tx.send(30).unwrap(); + /// + /// assert_eq!(tx.len(), 3); + /// + /// rx1.recv().await.unwrap(); + /// + /// // The len is still 3 since rx2 hasn't seen the first value yet. + /// assert_eq!(tx.len(), 3); + /// + /// rx2.recv().await.unwrap(); + /// + /// assert_eq!(tx.len(), 2); + /// } + /// ``` + pub fn len(&self) -> usize { + let tail = self.shared.tail.lock(); + + let base_idx = (tail.pos & self.shared.mask as u64) as usize; + let mut low = 0; + let mut high = self.shared.buffer.len(); + while low < high { + let mid = low + (high - low) / 2; + let idx = base_idx.wrapping_add(mid) & self.shared.mask; + if self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 { + low = mid + 1; + } else { + high = mid; + } + } + + self.shared.buffer.len() - low + } + + /// Returns true if there are no queued values. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel(16); + /// let mut rx2 = tx.subscribe(); + /// + /// assert!(tx.is_empty()); + /// + /// tx.send(10).unwrap(); + /// + /// assert!(!tx.is_empty()); + /// + /// rx1.recv().await.unwrap(); + /// + /// // The queue is still not empty since rx2 hasn't seen the value. + /// assert!(!tx.is_empty()); + /// + /// rx2.recv().await.unwrap(); + /// + /// assert!(tx.is_empty()); + /// } + /// ``` + pub fn is_empty(&self) -> bool { + let tail = self.shared.tail.lock(); + + let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize; + self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 + } + /// Returns the number of active receivers /// /// An active receiver is a [`Receiver`] handle returned from [`channel`] or @@ -728,7 +822,7 @@ impl Receiver { /// assert_eq!(rx1.len(), 2); /// assert_eq!(rx1.recv().await.unwrap(), 10); /// assert_eq!(rx1.len(), 1); - /// assert_eq!(rx1.recv().await.unwrap(), 20); + /// assert_eq!(rx1.recv().await.unwrap(), 20); /// assert_eq!(rx1.len(), 0); /// } /// ``` @@ -758,7 +852,7 @@ impl Receiver { /// /// assert!(!rx1.is_empty()); /// assert_eq!(rx1.recv().await.unwrap(), 10); - /// assert_eq!(rx1.recv().await.unwrap(), 20); + /// assert_eq!(rx1.recv().await.unwrap(), 20); /// assert!(rx1.is_empty()); /// } /// ``` diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 457e6ab..8fba196 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -94,6 +94,10 @@ //! producers to a single consumer. This channel is often used to send work to a //! task or to receive the result of many computations. //! +//! This is also the channel you should use if you want to send many messages +//! from a single producer to a single consumer. There is no dedicated spsc +//! channel. +//! //! **Example:** using an mpsc to incrementally stream the results of a series //! of computations. //! @@ -244,6 +248,10 @@ //! This channel tends to be used less often than `oneshot` and `mpsc` but still //! has its use cases. //! +//! This is also the channel you should use if you want to broadcast values from +//! a single producer to many consumers. There is no dedicated spmc broadcast +//! channel. +//! //! Basic usage //! //! ``` diff --git a/src/sync/mpsc/block.rs b/src/sync/mpsc/block.rs index 58f4a9f..39c3e1b 100644 --- a/src/sync/mpsc/block.rs +++ b/src/sync/mpsc/block.rs @@ -1,6 +1,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; +use std::alloc::Layout; use std::mem::MaybeUninit; use std::ops; use std::ptr::{self, NonNull}; @@ -10,6 +11,17 @@ use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; /// /// Each block in the list can hold up to `BLOCK_CAP` messages. pub(crate) struct Block { + /// The header fields. + header: BlockHeader, + + /// Array containing values pushed into the block. Values are stored in a + /// continuous array in order to improve cache line behavior when reading. + /// The values must be manually dropped. + values: Values, +} + +/// Extra fields for a `Block`. +struct BlockHeader { /// The start index of this block. /// /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`. @@ -24,11 +36,6 @@ pub(crate) struct Block { /// The observed `tail_position` value *after* the block has been passed by /// `block_tail`. observed_tail_position: UnsafeCell, - - /// Array containing values pushed into the block. Values are stored in a - /// continuous array in order to improve cache line behavior when reading. - /// The values must be manually dropped. - values: Values, } pub(crate) enum Read { @@ -36,6 +43,7 @@ pub(crate) enum Read { Closed, } +#[repr(transparent)] struct Values([UnsafeCell>; BLOCK_CAP]); use super::BLOCK_CAP; @@ -71,28 +79,56 @@ pub(crate) fn offset(slot_index: usize) -> usize { SLOT_MASK & slot_index } +generate_addr_of_methods! { + impl Block { + unsafe fn addr_of_header(self: NonNull) -> NonNull> { + &self.header + } + + unsafe fn addr_of_values(self: NonNull) -> NonNull> { + &self.values + } + } +} + impl Block { - pub(crate) fn new(start_index: usize) -> Block { - Block { - // The absolute index in the channel of the first slot in the block. - start_index, + pub(crate) fn new(start_index: usize) -> Box> { + unsafe { + // Allocate the block on the heap. + // SAFETY: The size of the Block is non-zero, since it is at least the size of the header. + let block = std::alloc::alloc(Layout::new::>()) as *mut Block; + let block = match NonNull::new(block) { + Some(block) => block, + None => std::alloc::handle_alloc_error(Layout::new::>()), + }; + + // Write the header to the block. + Block::addr_of_header(block).as_ptr().write(BlockHeader { + // The absolute index in the channel of the first slot in the block. + start_index, - // Pointer to the next block in the linked list. - next: AtomicPtr::new(ptr::null_mut()), + // Pointer to the next block in the linked list. + next: AtomicPtr::new(ptr::null_mut()), - ready_slots: AtomicUsize::new(0), + ready_slots: AtomicUsize::new(0), - observed_tail_position: UnsafeCell::new(0), + observed_tail_position: UnsafeCell::new(0), + }); - // Value storage - values: unsafe { Values::uninitialized() }, + // Initialize the values array. + Values::initialize(Block::addr_of_values(block)); + + // Convert the pointer to a `Box`. + // Safety: The raw pointer was allocated using the global allocator, and with + // the layout for a `Block`, so it's valid to convert it to box. + Box::from_raw(block.as_ptr()) } } /// Returns `true` if the block matches the given index. pub(crate) fn is_at_index(&self, index: usize) -> bool { debug_assert!(offset(index) == 0); - self.start_index == index + self.header.start_index == index } /// Returns the number of blocks between `self` and the block at the @@ -101,7 +137,7 @@ impl Block { /// `start_index` must represent a block *after* `self`. pub(crate) fn distance(&self, other_index: usize) -> usize { debug_assert!(offset(other_index) == 0); - other_index.wrapping_sub(self.start_index) / BLOCK_CAP + other_index.wrapping_sub(self.header.start_index) / BLOCK_CAP } /// Reads the value at the given offset. @@ -116,7 +152,7 @@ impl Block { pub(crate) unsafe fn read(&self, slot_index: usize) -> Option> { let offset = offset(slot_index); - let ready_bits = self.ready_slots.load(Acquire); + let ready_bits = self.header.ready_slots.load(Acquire); if !is_ready(ready_bits, offset) { if is_tx_closed(ready_bits) { @@ -156,7 +192,7 @@ impl Block { /// Signal to the receiver that the sender half of the list is closed. pub(crate) unsafe fn tx_close(&self) { - self.ready_slots.fetch_or(TX_CLOSED, Release); + self.header.ready_slots.fetch_or(TX_CLOSED, Release); } /// Resets the block to a blank state. This enables reusing blocks in the @@ -169,9 +205,9 @@ impl Block { /// * All slots are empty. /// * The caller holds a unique pointer to the block. pub(crate) unsafe fn reclaim(&mut self) { - self.start_index = 0; - self.next = AtomicPtr::new(ptr::null_mut()); - self.ready_slots = AtomicUsize::new(0); + self.header.start_index = 0; + self.header.next = AtomicPtr::new(ptr::null_mut()); + self.header.ready_slots = AtomicUsize::new(0); } /// Releases the block to the rx half for freeing. @@ -187,19 +223,20 @@ impl Block { pub(crate) unsafe fn tx_release(&self, tail_position: usize) { // Track the observed tail_position. Any sender targeting a greater // tail_position is guaranteed to not access this block. - self.observed_tail_position + self.header + .observed_tail_position .with_mut(|ptr| *ptr = tail_position); // Set the released bit, signalling to the receiver that it is safe to // free the block's memory as soon as all slots **prior** to // `observed_tail_position` have been filled. - self.ready_slots.fetch_or(RELEASED, Release); + self.header.ready_slots.fetch_or(RELEASED, Release); } /// Mark a slot as ready fn set_ready(&self, slot: usize) { let mask = 1 << slot; - self.ready_slots.fetch_or(mask, Release); + self.header.ready_slots.fetch_or(mask, Release); } /// Returns `true` when all slots have their `ready` bits set. @@ -214,25 +251,31 @@ impl Block { /// single atomic cell. However, this could have negative impact on cache /// behavior as there would be many more mutations to a single slot. pub(crate) fn is_final(&self) -> bool { - self.ready_slots.load(Acquire) & READY_MASK == READY_MASK + self.header.ready_slots.load(Acquire) & READY_MASK == READY_MASK } /// Returns the `observed_tail_position` value, if set pub(crate) fn observed_tail_position(&self) -> Option { - if 0 == RELEASED & self.ready_slots.load(Acquire) { + if 0 == RELEASED & self.header.ready_slots.load(Acquire) { None } else { - Some(self.observed_tail_position.with(|ptr| unsafe { *ptr })) + Some( + self.header + .observed_tail_position + .with(|ptr| unsafe { *ptr }), + ) } } /// Loads the next block pub(crate) fn load_next(&self, ordering: Ordering) -> Option>> { - let ret = NonNull::new(self.next.load(ordering)); + let ret = NonNull::new(self.header.next.load(ordering)); debug_assert!(unsafe { - ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP)) - .unwrap_or(true) + ret.map(|block| { + block.as_ref().header.start_index == self.header.start_index.wrapping_add(BLOCK_CAP) + }) + .unwrap_or(true) }); ret @@ -260,9 +303,10 @@ impl Block { success: Ordering, failure: Ordering, ) -> Result<(), NonNull>> { - block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP); + block.as_mut().header.start_index = self.header.start_index.wrapping_add(BLOCK_CAP); let next_ptr = self + .header .next .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure) .unwrap_or_else(|x| x); @@ -291,7 +335,7 @@ impl Block { // Create the new block. It is assumed that the block will become the // next one after `&self`. If this turns out to not be the case, // `start_index` is updated accordingly. - let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP)); + let new_block = Block::new(self.header.start_index + BLOCK_CAP); let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) }; @@ -308,7 +352,8 @@ impl Block { // `Release` ensures that the newly allocated block is available to // other threads acquiring the next pointer. let next = NonNull::new( - self.next + self.header + .next .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire) .unwrap_or_else(|x| x), ); @@ -360,19 +405,20 @@ fn is_tx_closed(bits: usize) -> bool { } impl Values { - unsafe fn uninitialized() -> Values { - let mut vals = MaybeUninit::uninit(); - + /// Initialize a `Values` struct from a pointer. + /// + /// # Safety + /// + /// The raw pointer must be valid for writing a `Values`. + unsafe fn initialize(_value: NonNull>) { // When fuzzing, `UnsafeCell` needs to be initialized. if_loom! { - let p = vals.as_mut_ptr() as *mut UnsafeCell>; + let p = _value.as_ptr() as *mut UnsafeCell>; for i in 0..BLOCK_CAP { p.add(i) .write(UnsafeCell::new(MaybeUninit::uninit())); } } - - Values(vals.assume_init()) } } @@ -383,3 +429,20 @@ impl ops::Index for Values { self.0.index(index) } } + +#[cfg(all(test, not(loom)))] +#[test] +fn assert_no_stack_overflow() { + // https://github.com/tokio-rs/tokio/issues/5293 + + struct Foo { + _a: [u8; 2_000_000], + } + + assert_eq!( + Layout::new::>>(), + Layout::new::>() + ); + + let _block = Block::::new(0); +} diff --git a/src/sync/mpsc/list.rs b/src/sync/mpsc/list.rs index e4eeb45..10b2957 100644 --- a/src/sync/mpsc/list.rs +++ b/src/sync/mpsc/list.rs @@ -44,7 +44,7 @@ pub(crate) enum TryPopResult { pub(crate) fn channel() -> (Tx, Rx) { // Create the initial block shared between the tx and rx halves. - let initial_block = Box::new(Block::new(0)); + let initial_block = Block::new(0); let initial_block_ptr = Box::into_raw(initial_block); let tx = Tx { diff --git a/src/sync/mpsc/mod.rs b/src/sync/mpsc/mod.rs index fff3091..33889fa 100644 --- a/src/sync/mpsc/mod.rs +++ b/src/sync/mpsc/mod.rs @@ -21,6 +21,9 @@ //! when additional capacity is available. In other words, the channel provides //! backpressure. //! +//! This channel is also suitable for the single-producer single-consumer +//! use-case. (Unless you only need to send one message, in which case you +//! should use the [oneshot] channel.) //! //! # Disconnection //! @@ -62,7 +65,7 @@ //! in mind, but they can also be generalized to other kinds of channels. In //! general, any channel method that isn't marked async can be called anywhere, //! including outside of the runtime. For example, sending a message on a -//! oneshot channel from outside the runtime is perfectly fine. +//! [oneshot] channel from outside the runtime is perfectly fine. //! //! # Multiple runtimes //! @@ -82,6 +85,7 @@ //! [blocking-recv]: crate::sync::mpsc::Receiver::blocking_recv() //! [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender //! [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver +//! [oneshot]: crate::sync::oneshot //! [`Handle::block_on`]: crate::runtime::Handle::block_on() //! [std-unbounded]: std::sync::mpsc::channel //! [crossbeam-unbounded]: https://docs.rs/crossbeam/*/crossbeam/channel/fn.unbounded.html diff --git a/src/task/join_set.rs b/src/task/join_set.rs index f181302..e6d8d62 100644 --- a/src/task/join_set.rs +++ b/src/task/join_set.rs @@ -120,9 +120,9 @@ impl JoinSet { /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`] /// that can be used to remotely cancel the task. /// - /// You do not have to `.await` the returned `JoinHandle` to make the - /// provided future start execution. It will start running in the background - /// immediately when `spawn` is called. + /// The provided future will start running in the background immediately + /// when this method is called, even if you don't await anything on this + /// `JoinSet`. /// /// # Panics /// @@ -143,9 +143,9 @@ impl JoinSet { /// `JoinSet` returning an [`AbortHandle`] that can be used to remotely /// cancel the task. /// - /// You do not have to `.await` the returned `JoinHandle` to make the - /// provided future start execution. It will start running in the background - /// immediately when `spawn_on` is called. + /// The provided future will start running in the background immediately + /// when this method is called, even if you don't await anything on this + /// `JoinSet`. /// /// [`AbortHandle`]: crate::task::AbortHandle #[track_caller] @@ -162,9 +162,9 @@ impl JoinSet { /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely /// cancel the task. /// - /// You do not have to `.await` the returned `JoinHandle` to make the - /// provided future start execution. It will start running in the background - /// immediately when `spawn_local` is called. + /// The provided future will start running in the background immediately + /// when this method is called, even if you don't await anything on this + /// `JoinSet`. /// /// # Panics /// @@ -186,10 +186,8 @@ impl JoinSet { /// remotely cancel the task. /// /// Unlike the [`spawn_local`] method, this method may be used to spawn local - /// tasks when the `LocalSet` is _not_ running. You do not have to `.await` - /// the returned `JoinHandle` to make the provided future start execution. - /// It will start running immediately whenever the `LocalSet` is next - /// started. + /// tasks on a `LocalSet` that is _not_ currently running. The provided + /// future will start running whenever the `LocalSet` is next started. /// /// [`LocalSet`]: crate::task::LocalSet /// [`AbortHandle`]: crate::task::AbortHandle diff --git a/src/task/local.rs b/src/task/local.rs index e4a198b..0675faa 100644 --- a/src/task/local.rs +++ b/src/task/local.rs @@ -1,8 +1,8 @@ //! Runs `!Send` futures on the current thread. use crate::loom::cell::UnsafeCell; use crate::loom::sync::{Arc, Mutex}; -use crate::loom::thread::{self, ThreadId}; use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task}; +use crate::runtime::{context, ThreadId}; use crate::sync::AtomicWaker; use crate::util::RcCell; @@ -277,12 +277,10 @@ pin_project! { } tokio_thread_local!(static CURRENT: LocalData = const { LocalData { - thread_id: Cell::new(None), ctx: RcCell::new(), } }); struct LocalData { - thread_id: Cell>, ctx: RcCell, } @@ -291,9 +289,9 @@ cfg_rt! { /// /// The spawned future will run on the same thread that called `spawn_local`. /// - /// You do not have to `.await` the returned `JoinHandle` to make the - /// provided future start execution. It will start running in the background - /// immediately when `spawn_local` is called. + /// The provided future will start running in the background immediately + /// when `spawn_local` is called, even if you don't await the returned + /// `JoinHandle`. /// /// # Panics /// @@ -379,12 +377,14 @@ impl fmt::Debug for LocalEnterGuard { impl LocalSet { /// Returns a new local task set. pub fn new() -> LocalSet { + let owner = context::thread_id().expect("cannot create LocalSet during thread shutdown"); + LocalSet { tick: Cell::new(0), context: Rc::new(Context { shared: Arc::new(Shared { local_state: LocalState { - owner: thread_id().expect("cannot create LocalSet during thread shutdown"), + owner, owned: LocalOwnedTasks::new(), local_queue: UnsafeCell::new(VecDeque::with_capacity(INITIAL_CAPACITY)), }, @@ -417,10 +417,9 @@ impl LocalSet { /// This task is guaranteed to be run on the current thread. /// /// Unlike the free function [`spawn_local`], this method may be used to - /// spawn local tasks when the `LocalSet` is _not_ running. You do not have - /// to `.await` the returned `JoinHandle` to make the provided future start - /// execution. It will start running immediately whenever the `LocalSet` is - /// next started. + /// spawn local tasks when the `LocalSet` is _not_ running. The provided + /// future will start running once the `LocalSet` is next started, even if + /// you don't await the returned `JoinHandle`. /// /// # Examples /// @@ -949,7 +948,7 @@ impl Shared { // We are on the thread that owns the `LocalSet`, so we can // wake to the local queue. - _ if localdata.get_id() == Some(self.local_state.owner) => { + _ if context::thread_id().ok() == Some(self.local_state.owner) => { unsafe { // Safety: we just checked that the thread ID matches // the localset's owner, so this is safe. @@ -1093,7 +1092,9 @@ impl LocalState { // if we couldn't get the thread ID because we're dropping the local // data, skip the assertion --- the `Drop` impl is not going to be // called from another thread, because `LocalSet` is `!Send` - thread_id().map(|id| id == self.owner).unwrap_or(true), + context::thread_id() + .map(|id| id == self.owner) + .unwrap_or(true), "`LocalSet`'s local run queue must not be accessed by another thread!" ); } @@ -1103,26 +1104,6 @@ impl LocalState { // ensure they are on the same thread that owns the `LocalSet`. unsafe impl Send for LocalState {} -impl LocalData { - fn get_id(&self) -> Option { - self.thread_id.get() - } - - fn get_or_insert_id(&self) -> ThreadId { - self.thread_id.get().unwrap_or_else(|| { - let id = thread::current().id(); - self.thread_id.set(Some(id)); - id - }) - } -} - -fn thread_id() -> Option { - CURRENT - .try_with(|localdata| localdata.get_or_insert_id()) - .ok() -} - #[cfg(all(test, not(loom)))] mod tests { use super::*; diff --git a/src/task/spawn.rs b/src/task/spawn.rs index 5db11a4..66b0d67 100644 --- a/src/task/spawn.rs +++ b/src/task/spawn.rs @@ -7,9 +7,9 @@ cfg_rt! { /// Spawns a new asynchronous task, returning a /// [`JoinHandle`](super::JoinHandle) for it. /// - /// You do not have to `.await` the returned `JoinHandle` to make the - /// provided future start execution. It will start running in the background - /// immediately when `spawn` is called. + /// The provided future will start running in the background immediately + /// when `spawn` is called, even if you don't await the returned + /// `JoinHandle`. /// /// Spawning a task enables the task to execute concurrently to other tasks. The /// spawned task may execute on the current thread, or it may be sent to a diff --git a/src/time/clock.rs b/src/time/clock.rs index 0343c4f..cd11a67 100644 --- a/src/time/clock.rs +++ b/src/time/clock.rs @@ -65,6 +65,9 @@ cfg_test_util! { /// Instant at which the clock was last unfrozen. unfrozen: Option, + + /// Number of `inhibit_auto_advance` calls still in effect. + auto_advance_inhibit_count: usize, } /// Pauses time. @@ -187,6 +190,7 @@ cfg_test_util! { enable_pausing, base: now, unfrozen: Some(now), + auto_advance_inhibit_count: 0, })), }; @@ -212,9 +216,20 @@ cfg_test_util! { inner.unfrozen = None; } - pub(crate) fn is_paused(&self) -> bool { + /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`). + pub(crate) fn inhibit_auto_advance(&self) { + let mut inner = self.inner.lock(); + inner.auto_advance_inhibit_count += 1; + } + + pub(crate) fn allow_auto_advance(&self) { + let mut inner = self.inner.lock(); + inner.auto_advance_inhibit_count -= 1; + } + + pub(crate) fn can_auto_advance(&self) -> bool { let inner = self.inner.lock(); - inner.unfrozen.is_none() + inner.unfrozen.is_none() && inner.auto_advance_inhibit_count == 0 } #[track_caller] diff --git a/src/time/sleep.rs b/src/time/sleep.rs index d974e1a..0a012e2 100644 --- a/src/time/sleep.rs +++ b/src/time/sleep.rs @@ -357,7 +357,7 @@ impl Sleep { fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { let mut me = self.project(); me.entry.as_mut().reset(deadline); - (*me.inner).deadline = deadline; + (me.inner).deadline = deadline; #[cfg(all(tokio_unstable, feature = "tracing"))] { diff --git a/src/util/linked_list.rs b/src/util/linked_list.rs index 9698f72..b46bd6d 100644 --- a/src/util/linked_list.rs +++ b/src/util/linked_list.rs @@ -126,7 +126,7 @@ impl LinkedList { pub(crate) fn push_front(&mut self, val: L::Handle) { // The value should not be dropped, it is being inserted into the list let val = ManuallyDrop::new(val); - let ptr = L::as_raw(&*val); + let ptr = L::as_raw(&val); assert_ne!(self.head, Some(ptr)); unsafe { L::pointers(ptr).as_mut().set_next(self.head); diff --git a/src/util/mod.rs b/src/util/mod.rs index 245e64d..9f6119a 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -6,7 +6,12 @@ cfg_io_driver! { #[cfg(feature = "rt")] pub(crate) mod atomic_cell; -#[cfg(any(feature = "rt", feature = "signal", feature = "process"))] +#[cfg(any( + feature = "rt", + feature = "signal", + feature = "process", + tokio_no_const_mutex_new, +))] pub(crate) mod once_cell; #[cfg(any( diff --git a/src/util/once_cell.rs b/src/util/once_cell.rs index 138d2a7..1925f0a 100644 --- a/src/util/once_cell.rs +++ b/src/util/once_cell.rs @@ -25,7 +25,7 @@ impl OnceCell { /// If the `init` closure panics, then the `OnceCell` is poisoned and all /// future calls to `get` will panic. #[inline] - pub(crate) fn get(&self, init: fn() -> T) -> &T { + pub(crate) fn get(&self, init: impl FnOnce() -> T) -> &T { if !self.once.is_completed() { self.do_init(init); } @@ -41,7 +41,7 @@ impl OnceCell { } #[cold] - fn do_init(&self, init: fn() -> T) { + fn do_init(&self, init: impl FnOnce() -> T) { let value_ptr = self.value.get() as *mut T; self.once.call_once(|| { diff --git a/tests/_require_full.rs b/tests/_require_full.rs index a339374..4b9698a 100644 --- a/tests/_require_full.rs +++ b/tests/_require_full.rs @@ -1,2 +1,8 @@ -#![cfg(not(any(feature = "full", tokio_wasm)))] +#[cfg(not(any(feature = "full", tokio_wasm)))] compile_error!("run main Tokio tests with `--features full`"); + +// CI sets `--cfg tokio_no_parking_lot` when trying to run tests with +// `parking_lot` disabled. This check prevents "silent failure" if `parking_lot` +// accidentally gets enabled. +#[cfg(all(tokio_no_parking_lot, feature = "parking_lot"))] +compile_error!("parking_lot feature enabled when it should not be"); diff --git a/tests/buffered.rs b/tests/buffered.rs index 19afebd..4251c3f 100644 --- a/tests/buffered.rs +++ b/tests/buffered.rs @@ -18,10 +18,10 @@ async fn echo_server() { let msg = "foo bar baz"; let t = thread::spawn(move || { - let mut s = assert_ok!(TcpStream::connect(&addr)); + let mut s = assert_ok!(TcpStream::connect(addr)); let t2 = thread::spawn(move || { - let mut s = assert_ok!(TcpStream::connect(&addr)); + let mut s = assert_ok!(TcpStream::connect(addr)); let mut b = vec![0; msg.len() * N]; assert_ok!(s.read_exact(&mut b)); b diff --git a/tests/io_driver.rs b/tests/io_driver.rs index 2ca5630..97018e0 100644 --- a/tests/io_driver.rs +++ b/tests/io_driver.rs @@ -80,7 +80,7 @@ fn test_drop_on_notify() { drop(task); // Establish a connection to the acceptor - let _s = TcpStream::connect(&addr).unwrap(); + let _s = TcpStream::connect(addr).unwrap(); // Force the reactor to turn rt.block_on(async {}); diff --git a/tests/macros_join.rs b/tests/macros_join.rs index 4441582..9e4d234 100644 --- a/tests/macros_join.rs +++ b/tests/macros_join.rs @@ -1,5 +1,5 @@ #![cfg(feature = "macros")] -#![allow(clippy::blacklisted_name)] +#![allow(clippy::disallowed_names)] use std::sync::Arc; #[cfg(tokio_wasm_not_wasi)] diff --git a/tests/macros_select.rs b/tests/macros_select.rs index 60f3738..26d6fec 100644 --- a/tests/macros_select.rs +++ b/tests/macros_select.rs @@ -1,5 +1,5 @@ #![cfg(feature = "macros")] -#![allow(clippy::blacklisted_name)] +#![allow(clippy::disallowed_names)] #[cfg(tokio_wasm_not_wasi)] use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; diff --git a/tests/macros_try_join.rs b/tests/macros_try_join.rs index 209516b..6c43222 100644 --- a/tests/macros_try_join.rs +++ b/tests/macros_try_join.rs @@ -1,5 +1,5 @@ #![cfg(feature = "macros")] -#![allow(clippy::blacklisted_name)] +#![allow(clippy::disallowed_names)] use std::sync::Arc; diff --git a/tests/rt_common.rs b/tests/rt_common.rs index 53248b2..3892998 100644 --- a/tests/rt_common.rs +++ b/tests/rt_common.rs @@ -661,7 +661,7 @@ rt_test! { loop { // Don't use Tokio's `yield_now()` to avoid special defer // logic. - let _: () = futures::future::poll_fn(|cx| { + futures::future::poll_fn::<(), _>(|cx| { cx.waker().wake_by_ref(); std::task::Poll::Pending }).await; diff --git a/tests/rt_metrics.rs b/tests/rt_metrics.rs index 2a9f998..fdb2fb5 100644 --- a/tests/rt_metrics.rs +++ b/tests/rt_metrics.rs @@ -31,6 +31,19 @@ fn num_idle_blocking_threads() { rt.block_on(async { time::sleep(Duration::from_millis(5)).await; }); + + // We need to wait until the blocking thread has become idle. Usually 5ms is + // enough for this to happen, but not always. When it isn't enough, sleep + // for another second. We don't always wait for a whole second since we want + // the test suite to finish quickly. + // + // Note that the timeout for idle threads to be killed is 10 seconds. + if 0 == rt.metrics().num_idle_blocking_threads() { + rt.block_on(async { + time::sleep(Duration::from_secs(1)).await; + }); + } + assert_eq!(1, rt.metrics().num_idle_blocking_threads()); } @@ -128,7 +141,7 @@ fn worker_noop_count() { time::sleep(Duration::from_millis(1)).await; }); drop(rt); - assert!(2 <= metrics.worker_noop_count(0)); + assert!(0 < metrics.worker_noop_count(0)); let rt = threaded(); let metrics = rt.metrics(); @@ -136,8 +149,8 @@ fn worker_noop_count() { time::sleep(Duration::from_millis(1)).await; }); drop(rt); - assert!(1 <= metrics.worker_noop_count(0)); - assert!(1 <= metrics.worker_noop_count(1)); + assert!(0 < metrics.worker_noop_count(0)); + assert!(0 < metrics.worker_noop_count(1)); } #[test] diff --git a/tests/support/leaked_buffers.rs b/tests/support/leaked_buffers.rs index 3ee8a18..a6079fb 100644 --- a/tests/support/leaked_buffers.rs +++ b/tests/support/leaked_buffers.rs @@ -18,9 +18,9 @@ impl LeakedBuffers { } } pub unsafe fn create<'a>(&mut self, size: usize) -> &'a mut [u8] { - let mut new_mem = vec![0u8; size].into_boxed_slice(); - let slice = std::slice::from_raw_parts_mut(new_mem.as_mut_ptr(), new_mem.len()); + let new_mem = vec![0u8; size].into_boxed_slice(); self.leaked_vecs.push(new_mem); - slice + let new_mem = self.leaked_vecs.last_mut().unwrap(); + std::slice::from_raw_parts_mut(new_mem.as_mut_ptr(), new_mem.len()) } } diff --git a/tests/support/panic.rs b/tests/support/panic.rs index 7f60c76..df2f59d 100644 --- a/tests/support/panic.rs +++ b/tests/support/panic.rs @@ -1,9 +1,8 @@ -use parking_lot::{const_mutex, Mutex}; use std::panic; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; pub fn test_panic(func: Func) -> Option { - static PANIC_MUTEX: Mutex<()> = const_mutex(()); + static PANIC_MUTEX: Mutex<()> = Mutex::new(()); { let _guard = PANIC_MUTEX.lock(); @@ -16,6 +15,7 @@ pub fn test_panic(func: Func) -> Option(func: Func) -> Option { + let _ = rx1.try_recv(); + } + 1 => { + let _ = rx2.try_recv(); + } + _ => { + tx.send(0).unwrap(); + } + } + + let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16); + assert_eq!(tx.len(), expected_len); + } +} diff --git a/tests/sync_once_cell.rs b/tests/sync_once_cell.rs index 18eaf93..38dfa7c 100644 --- a/tests/sync_once_cell.rs +++ b/tests/sync_once_cell.rs @@ -4,178 +4,7 @@ use std::mem; use std::ops::Drop; use std::sync::atomic::{AtomicU32, Ordering}; -use std::time::Duration; -use tokio::runtime; -use tokio::sync::{OnceCell, SetError}; -use tokio::time; - -async fn func1() -> u32 { - 5 -} - -async fn func2() -> u32 { - time::sleep(Duration::from_millis(1)).await; - 10 -} - -async fn func_err() -> Result { - Err(()) -} - -async fn func_ok() -> Result { - Ok(10) -} - -async fn func_panic() -> u32 { - time::sleep(Duration::from_millis(1)).await; - panic!(); -} - -async fn sleep_and_set() -> u32 { - // Simulate sleep by pausing time and waiting for another thread to - // resume clock when calling `set`, then finding the cell being initialized - // by this call - time::sleep(Duration::from_millis(2)).await; - 5 -} - -async fn advance_time_and_set(cell: &'static OnceCell, v: u32) -> Result<(), SetError> { - time::advance(Duration::from_millis(1)).await; - cell.set(v) -} - -#[test] -fn get_or_init() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .start_paused(true) - .build() - .unwrap(); - - static ONCE: OnceCell = OnceCell::const_new(); - - rt.block_on(async { - let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); - let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await }); - - time::advance(Duration::from_millis(1)).await; - time::resume(); - - let result1 = handle1.await.unwrap(); - let result2 = handle2.await.unwrap(); - - assert_eq!(*result1, 5); - assert_eq!(*result2, 5); - }); -} - -#[test] -fn get_or_init_panic() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); - - static ONCE: OnceCell = OnceCell::const_new(); - - rt.block_on(async { - time::pause(); - - let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); - let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await }); - - time::advance(Duration::from_millis(1)).await; - - let result1 = handle1.await.unwrap(); - let result2 = handle2.await.unwrap(); - - assert_eq!(*result1, 5); - assert_eq!(*result2, 5); - }); -} - -#[test] -fn set_and_get() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); - - static ONCE: OnceCell = OnceCell::const_new(); - - rt.block_on(async { - let _ = rt.spawn(async { ONCE.set(5) }).await; - let value = ONCE.get().unwrap(); - assert_eq!(*value, 5); - }); -} - -#[test] -fn get_uninit() { - static ONCE: OnceCell = OnceCell::const_new(); - let uninit = ONCE.get(); - assert!(uninit.is_none()); -} - -#[test] -fn set_twice() { - static ONCE: OnceCell = OnceCell::const_new(); - - let first = ONCE.set(5); - assert_eq!(first, Ok(())); - let second = ONCE.set(6); - assert!(second.err().unwrap().is_already_init_err()); -} - -#[test] -fn set_while_initializing() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); - - static ONCE: OnceCell = OnceCell::const_new(); - - rt.block_on(async { - time::pause(); - - let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await }); - let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await }); - - time::advance(Duration::from_millis(2)).await; - - let result1 = handle1.await.unwrap(); - let result2 = handle2.await.unwrap(); - - assert_eq!(*result1, 5); - assert!(result2.err().unwrap().is_initializing_err()); - }); -} - -#[test] -fn get_or_try_init() { - let rt = runtime::Builder::new_current_thread() - .enable_time() - .start_paused(true) - .build() - .unwrap(); - - static ONCE: OnceCell = OnceCell::const_new(); - - rt.block_on(async { - let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await }); - let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await }); - - time::advance(Duration::from_millis(1)).await; - time::resume(); - - let result1 = handle1.await.unwrap(); - assert!(result1.is_err()); - - let result2 = handle2.await.unwrap(); - assert_eq!(*result2.unwrap(), 10); - }); -} +use tokio::sync::OnceCell; #[test] fn drop_cell() { @@ -272,3 +101,185 @@ fn from() { let cell = OnceCell::from(2); assert_eq!(*cell.get().unwrap(), 2); } + +#[cfg(feature = "parking_lot")] +mod parking_lot { + use super::*; + + use tokio::runtime; + use tokio::sync::SetError; + use tokio::time; + + use std::time::Duration; + + async fn func1() -> u32 { + 5 + } + + async fn func2() -> u32 { + time::sleep(Duration::from_millis(1)).await; + 10 + } + + async fn func_err() -> Result { + Err(()) + } + + async fn func_ok() -> Result { + Ok(10) + } + + async fn func_panic() -> u32 { + time::sleep(Duration::from_millis(1)).await; + panic!(); + } + + async fn sleep_and_set() -> u32 { + // Simulate sleep by pausing time and waiting for another thread to + // resume clock when calling `set`, then finding the cell being initialized + // by this call + time::sleep(Duration::from_millis(2)).await; + 5 + } + + async fn advance_time_and_set( + cell: &'static OnceCell, + v: u32, + ) -> Result<(), SetError> { + time::advance(Duration::from_millis(1)).await; + cell.set(v) + } + + #[test] + fn get_or_init() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await }); + + time::advance(Duration::from_millis(1)).await; + time::resume(); + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); + + assert_eq!(*result1, 5); + assert_eq!(*result2, 5); + }); + } + + #[test] + fn get_or_init_panic() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + time::pause(); + + let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await }); + + time::advance(Duration::from_millis(1)).await; + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); + + assert_eq!(*result1, 5); + assert_eq!(*result2, 5); + }); + } + + #[test] + fn set_and_get() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + let _ = rt.spawn(async { ONCE.set(5) }).await; + let value = ONCE.get().unwrap(); + assert_eq!(*value, 5); + }); + } + + #[test] + fn get_uninit() { + static ONCE: OnceCell = OnceCell::const_new(); + let uninit = ONCE.get(); + assert!(uninit.is_none()); + } + + #[test] + fn set_twice() { + static ONCE: OnceCell = OnceCell::const_new(); + + let first = ONCE.set(5); + assert_eq!(first, Ok(())); + let second = ONCE.set(6); + assert!(second.err().unwrap().is_already_init_err()); + } + + #[test] + fn set_while_initializing() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + time::pause(); + + let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await }); + let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await }); + + time::advance(Duration::from_millis(2)).await; + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); + + assert_eq!(*result1, 5); + assert!(result2.err().unwrap().is_initializing_err()); + }); + } + + #[test] + fn get_or_try_init() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await }); + let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await }); + + time::advance(Duration::from_millis(1)).await; + time::resume(); + + let result1 = handle1.await.unwrap(); + assert!(result1.is_err()); + + let result2 = handle2.await.unwrap(); + assert_eq!(*result2.unwrap(), 10); + }); + } +} diff --git a/tests/task_blocking.rs b/tests/task_blocking.rs index 2e0881c..d82a0e0 100644 --- a/tests/task_blocking.rs +++ b/tests/task_blocking.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads -use tokio::{runtime, task}; +use tokio::{runtime, task, time}; use tokio_test::assert_ok; use std::thread; @@ -227,3 +227,84 @@ fn coop_disabled_in_block_in_place_in_block_on() { done_rx.recv().unwrap().unwrap(); } + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn blocking_when_paused() { + // Do not auto-advance time when we have started a blocking task that has + // not yet finished. + time::timeout( + Duration::from_secs(3), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); + + // Really: Do not auto-advance time, even if the timeout is short and the + // blocking task runs for longer than that. It doesn't matter: Tokio time + // is paused; system time is not. + time::timeout( + Duration::from_millis(1), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); +} + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + time::timeout( + Duration::from_secs(15), + task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))), + ) + .await + .expect("timeout should not trigger") + .expect("blocking task should finish"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn unawaited_blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + + // When this task finishes, time should auto-advance, even though the + // JoinHandle has not been awaited yet. + let a = task::spawn_blocking(|| { + thread::sleep(Duration::from_millis(1)); + }); + + crate::time::sleep(Duration::from_secs(15)).await; + a.await.expect("blocking task should finish"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn panicking_blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + let result = time::timeout( + Duration::from_secs(15), + task::spawn_blocking(|| { + thread::sleep(Duration::from_millis(1)); + panic!("blocking task panicked"); + }), + ) + .await + .expect("timeout should not trigger"); + assert!(result.is_err(), "blocking task should have panicked"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +} diff --git a/tests/task_join_set.rs b/tests/task_join_set.rs index 20d4927..b1b6cf9 100644 --- a/tests/task_join_set.rs +++ b/tests/task_join_set.rs @@ -5,8 +5,6 @@ use tokio::sync::oneshot; use tokio::task::JoinSet; use tokio::time::Duration; -use futures::future::FutureExt; - fn rt() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_current_thread() .build() @@ -156,49 +154,6 @@ fn runtime_gone() { .is_cancelled()); } -// This ensures that `join_next` works correctly when the coop budget is -// exhausted. -#[tokio::test(flavor = "current_thread")] -async fn join_set_coop() { - // Large enough to trigger coop. - const TASK_NUM: u32 = 1000; - - static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0); - - let mut set = JoinSet::new(); - - for _ in 0..TASK_NUM { - set.spawn(async { - SEM.add_permits(1); - }); - } - - // Wait for all tasks to complete. - // - // Since this is a `current_thread` runtime, there's no race condition - // between the last permit being added and the task completing. - let _ = SEM.acquire_many(TASK_NUM).await.unwrap(); - - let mut count = 0; - let mut coop_count = 0; - loop { - match set.join_next().now_or_never() { - Some(Some(Ok(()))) => {} - Some(Some(Err(err))) => panic!("failed: {}", err), - None => { - coop_count += 1; - tokio::task::yield_now().await; - continue; - } - Some(None) => break, - } - - count += 1; - } - assert!(coop_count >= 1); - assert_eq!(count, TASK_NUM); -} - #[tokio::test(start_paused = true)] async fn abort_all() { let mut set: JoinSet<()> = JoinSet::new(); @@ -228,3 +183,53 @@ async fn abort_all() { assert_eq!(count, 10); assert_eq!(set.len(), 0); } + +#[cfg(feature = "parking_lot")] +mod parking_lot { + use super::*; + + use futures::future::FutureExt; + + // This ensures that `join_next` works correctly when the coop budget is + // exhausted. + #[tokio::test(flavor = "current_thread")] + async fn join_set_coop() { + // Large enough to trigger coop. + const TASK_NUM: u32 = 1000; + + static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0); + + let mut set = JoinSet::new(); + + for _ in 0..TASK_NUM { + set.spawn(async { + SEM.add_permits(1); + }); + } + + // Wait for all tasks to complete. + // + // Since this is a `current_thread` runtime, there's no race condition + // between the last permit being added and the task completing. + let _ = SEM.acquire_many(TASK_NUM).await.unwrap(); + + let mut count = 0; + let mut coop_count = 0; + loop { + match set.join_next().now_or_never() { + Some(Some(Ok(()))) => {} + Some(Some(Err(err))) => panic!("failed: {}", err), + None => { + coop_count += 1; + tokio::task::yield_now().await; + continue; + } + Some(None) => break, + } + + count += 1; + } + assert!(coop_count >= 1); + assert_eq!(count, TASK_NUM); + } +} diff --git a/tests/tcp_peek.rs b/tests/tcp_peek.rs index 03813c2..b712023 100644 --- a/tests/tcp_peek.rs +++ b/tests/tcp_peek.rs @@ -15,7 +15,7 @@ async fn peek() { let addr = listener.local_addr().unwrap(); let t = thread::spawn(move || assert_ok!(listener.accept()).0); - let left = net::TcpStream::connect(&addr).unwrap(); + let left = net::TcpStream::connect(addr).unwrap(); let mut right = t.join().unwrap(); let _ = right.write(&[1, 2, 3, 4]).unwrap(); -- cgit v1.2.3