aboutsummaryrefslogtreecommitdiff
path: root/src/runtime
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2021-01-14 17:23:22 -0800
committerJeff Vander Stoep <jeffv@google.com>2021-01-15 20:44:08 +0100
commit290fc4903cd00fc31d93e0ecd49c402e6833c569 (patch)
tree4a9646d2ab712bae1ead875992160c7248588daf /src/runtime
parent84cad6596f48e471881980dcba7df9cb5b4b0139 (diff)
downloadtokio-290fc4903cd00fc31d93e0ecd49c402e6833c569.tar.gz
Upgrade rust/crates/tokio to 1.0.2platform-tools-31.0.0
Test: make Change-Id: Ic48ff709bade266749eac8c146856901ce78da7f
Diffstat (limited to 'src/runtime')
-rw-r--r--src/runtime/blocking/shutdown.rs2
-rw-r--r--src/runtime/builder.rs39
-rw-r--r--src/runtime/driver.rs9
-rw-r--r--src/runtime/handle.rs4
-rw-r--r--src/runtime/mod.rs4
-rw-r--r--src/runtime/thread_pool/worker.rs137
6 files changed, 115 insertions, 80 deletions
diff --git a/src/runtime/blocking/shutdown.rs b/src/runtime/blocking/shutdown.rs
index 3b6cc59..0cf2285 100644
--- a/src/runtime/blocking/shutdown.rs
+++ b/src/runtime/blocking/shutdown.rs
@@ -38,7 +38,7 @@ impl Receiver {
use crate::runtime::enter::try_enter;
if timeout == Some(Duration::from_nanos(0)) {
- return true;
+ return false;
}
let mut e = match try_enter(false) {
diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs
index e792c7d..1f8892e 100644
--- a/src/runtime/builder.rs
+++ b/src/runtime/builder.rs
@@ -53,7 +53,7 @@ pub struct Builder {
worker_threads: Option<usize>,
/// Cap on thread usage.
- max_threads: usize,
+ max_blocking_threads: usize,
/// Name fn used for threads spawned by the runtime.
pub(super) thread_name: ThreadNameFn,
@@ -113,7 +113,7 @@ impl Builder {
// Default to lazy auto-detection (one thread per CPU core)
worker_threads: None,
- max_threads: 512,
+ max_blocking_threads: 512,
// Default thread name
thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
@@ -209,22 +209,22 @@ impl Builder {
self
}
- /// Specifies limit for threads, spawned by the Runtime.
+ /// Specifies limit for threads spawned by the Runtime used for blocking operations.
///
- /// This is number of threads to be used by Runtime, including `core_threads`
- /// Having `max_threads` less than `worker_threads` results in invalid configuration
- /// when building multi-threaded `Runtime`, which would cause a panic.
///
- /// Similarly to the `worker_threads`, this number should be between 0 and 32,768.
+ /// Similarly to the `worker_threads`, this number should be between 1 and 32,768.
///
/// The default value is 512.
///
- /// When multi-threaded runtime is not used, will act as limit on additional threads.
+ /// Otherwise as `worker_threads` are always active, it limits additional threads (e.g. for
+ /// blocking annotations).
///
- /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for
- /// blocking annotations) as `max_threads - core_threads`.
- pub fn max_threads(&mut self, val: usize) -> &mut Self {
- self.max_threads = val;
+ /// # Panic
+ ///
+ /// This will panic if `val` is not larger than `0`.
+ pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
+ assert!(val > 0, "Max blocking threads cannot be set to 0");
+ self.max_blocking_threads = val;
self
}
@@ -379,6 +379,11 @@ impl Builder {
fn get_cfg(&self) -> driver::Cfg {
driver::Cfg {
+ enable_pause_time: match self.kind {
+ Kind::CurrentThread => true,
+ #[cfg(feature = "rt-multi-thread")]
+ Kind::MultiThread => false,
+ },
enable_io: self.enable_io,
enable_time: self.enable_time,
}
@@ -419,7 +424,7 @@ impl Builder {
let spawner = Spawner::Basic(scheduler.spawner().clone());
// Blocking pool
- let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
@@ -490,10 +495,8 @@ cfg_rt_multi_thread! {
use crate::loom::sys::num_cpus;
use crate::runtime::{Kind, ThreadPool};
use crate::runtime::park::Parker;
- use std::cmp;
- let core_threads = self.worker_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));
- assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");
+ let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
let (driver, resources) = driver::Driver::new(self.get_cfg())?;
@@ -501,7 +504,7 @@ cfg_rt_multi_thread! {
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
// Create the blocking pool
- let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();
// Create the runtime handle
@@ -531,7 +534,7 @@ impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
.field("worker_threads", &self.worker_threads)
- .field("max_threads", &self.max_threads)
+ .field("max_blocking_threads", &self.max_blocking_threads)
.field(
"thread_name",
&"<dyn Fn() -> String + Send + Sync + 'static>",
diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs
index e89de9d..b89fa4f 100644
--- a/src/runtime/driver.rs
+++ b/src/runtime/driver.rs
@@ -103,8 +103,8 @@ cfg_time! {
pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;
- fn create_clock() -> Clock {
- crate::time::Clock::new()
+ fn create_clock(enable_pausing: bool) -> Clock {
+ crate::time::Clock::new(enable_pausing)
}
fn create_time_driver(
@@ -131,7 +131,7 @@ cfg_not_time! {
pub(crate) type Clock = ();
pub(crate) type TimeHandle = ();
- fn create_clock() -> Clock {
+ fn create_clock(_enable_pausing: bool) -> Clock {
()
}
@@ -161,13 +161,14 @@ pub(crate) struct Resources {
pub(crate) struct Cfg {
pub(crate) enable_io: bool,
pub(crate) enable_time: bool,
+ pub(crate) enable_pause_time: bool,
}
impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
- let clock = create_clock();
+ let clock = create_clock(cfg.enable_pause_time);
let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, io_stack, clock.clone());
diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs
index 138d13b..6ff3c39 100644
--- a/src/runtime/handle.rs
+++ b/src/runtime/handle.rs
@@ -142,7 +142,7 @@ impl Handle {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
- #[cfg(feature = "tracing")]
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task");
self.spawner.spawn(future)
}
@@ -172,7 +172,7 @@ impl Handle {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
- #[cfg(feature = "tracing")]
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
let func = {
#[cfg(tokio_track_caller)]
let location = std::panic::Location::caller();
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs
index d7f068e..2c90acb 100644
--- a/src/runtime/mod.rs
+++ b/src/runtime/mod.rs
@@ -20,7 +20,7 @@
//!
//! ```no_run
//! use tokio::net::TcpListener;
-//! use tokio::prelude::*;
+//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -63,7 +63,7 @@
//!
//! ```no_run
//! use tokio::net::TcpListener;
-//! use tokio::prelude::*;
+//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
//! use tokio::runtime::Runtime;
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs
index bc544c9..31712e4 100644
--- a/src/runtime/thread_pool/worker.rs
+++ b/src/runtime/thread_pool/worker.rs
@@ -78,11 +78,12 @@ pub(super) struct Shared {
/// Coordinates idle workers
idle: Idle,
- /// Workers have have observed the shutdown signal
+ /// Cores that have observed the shutdown signal
///
/// The core is **not** placed back in the worker to avoid it from being
/// stolen by a thread that was spawned as part of `block_in_place`.
- shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>,
+ #[allow(clippy::vec_box)] // we're moving an already-boxed value
+ shutdown_cores: Mutex<Vec<Box<Core>>>,
}
/// Used to communicate with a worker from other threads.
@@ -157,7 +158,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
remotes: remotes.into_boxed_slice(),
inject: queue::Inject::new(),
idle: Idle::new(size),
- shutdown_workers: Mutex::new(vec![]),
+ shutdown_cores: Mutex::new(vec![]),
});
let mut launch = Launch(vec![]);
@@ -328,8 +329,10 @@ impl Context {
}
}
+ core.pre_shutdown(&self.worker);
+
// Signal shutdown
- self.worker.shared.shutdown(core, self.worker.clone());
+ self.worker.shared.shutdown(core);
Err(())
}
@@ -546,11 +549,9 @@ impl Core {
}
}
- // Shutdown the core
- fn shutdown(&mut self, worker: &Worker) {
- // Take the core
- let mut park = self.park.take().expect("park missing");
-
+ // Signals all tasks to shut down, and waits for them to complete. Must run
+ // before we enter the single-threaded phase of shutdown processing.
+ fn pre_shutdown(&mut self, worker: &Worker) {
// Signal to all tasks to shut down.
for header in self.tasks.iter() {
header.shutdown();
@@ -564,8 +565,17 @@ impl Core {
}
// Wait until signalled
+ let park = self.park.as_mut().expect("park missing");
park.park().expect("park failed");
}
+ }
+
+ // Shutdown the core
+ fn shutdown(&mut self) {
+ assert!(self.tasks.is_empty());
+
+ // Take the core
+ let mut park = self.park.take().expect("park missing");
// Drain the queue
while self.next_local_task().is_some() {}
@@ -629,52 +639,73 @@ impl task::Schedule for Arc<Worker> {
fn release(&self, task: &Task) -> Option<Task> {
use std::ptr::NonNull;
- CURRENT.with(|maybe_cx| {
- let cx = maybe_cx.expect("scheduler context missing");
+ enum Immediate {
+ // Task has been synchronously removed from the Core owned by the
+ // current thread
+ Removed(Option<Task>),
+ // Task is owned by another thread, so we need to notify it to clean
+ // up the task later.
+ MaybeRemote,
+ }
- if self.eq(&cx.worker) {
- let mut maybe_core = cx.core.borrow_mut();
+ let immediate = CURRENT.with(|maybe_cx| {
+ let cx = match maybe_cx {
+ Some(cx) => cx,
+ None => return Immediate::MaybeRemote,
+ };
- if let Some(core) = &mut *maybe_core {
- // Directly remove the task
- //
- // safety: the task is inserted in the list in `bind`.
- unsafe {
- let ptr = NonNull::from(task.header());
- return core.tasks.remove(ptr);
- }
- }
+ if !self.eq(&cx.worker) {
+ // Task owned by another core, so we need to notify it.
+ return Immediate::MaybeRemote;
}
- // Track the task to be released by the worker that owns it
- //
- // Safety: We get a new handle without incrementing the ref-count.
- // A ref-count is held by the "owned" linked list and it is only
- // ever removed from that list as part of the release process: this
- // method or popping the task from `pending_drop`. Thus, we can rely
- // on the ref-count held by the linked-list to keep the memory
- // alive.
- //
- // When the task is removed from the stack, it is forgotten instead
- // of dropped.
- let task = unsafe { Task::from_raw(task.header().into()) };
-
- self.remote().pending_drop.push(task);
+ let mut maybe_core = cx.core.borrow_mut();
- if cx.core.borrow().is_some() {
- return None;
+ if let Some(core) = &mut *maybe_core {
+ // Directly remove the task
+ //
+ // safety: the task is inserted in the list in `bind`.
+ unsafe {
+ let ptr = NonNull::from(task.header());
+ return Immediate::Removed(core.tasks.remove(ptr));
+ }
}
- // The worker core has been handed off to another thread. In the
- // event that the scheduler is currently shutting down, the thread
- // that owns the task may be waiting on the release to complete
- // shutdown.
- if self.inject().is_closed() {
- self.remote().unpark.unpark();
- }
+ Immediate::MaybeRemote
+ });
- None
- })
+ // Checks if we were called from within a worker, allowing for immediate
+ // removal of a scheduled task. Else we have to go through the slower
+ // process below where we remotely mark a task as dropped.
+ match immediate {
+ Immediate::Removed(task) => return task,
+ Immediate::MaybeRemote => (),
+ };
+
+ // Track the task to be released by the worker that owns it
+ //
+ // Safety: We get a new handle without incrementing the ref-count.
+ // A ref-count is held by the "owned" linked list and it is only
+ // ever removed from that list as part of the release process: this
+ // method or popping the task from `pending_drop`. Thus, we can rely
+ // on the ref-count held by the linked-list to keep the memory
+ // alive.
+ //
+ // When the task is removed from the stack, it is forgotten instead
+ // of dropped.
+ let task = unsafe { Task::from_raw(task.header().into()) };
+
+ self.remote().pending_drop.push(task);
+
+ // The worker core has been handed off to another thread. In the
+ // event that the scheduler is currently shutting down, the thread
+ // that owns the task may be waiting on the release to complete
+ // shutdown.
+ if self.inject().is_closed() {
+ self.remote().unpark.unpark();
+ }
+
+ None
}
fn schedule(&self, task: Notified) {
@@ -779,16 +810,16 @@ impl Shared {
/// its core back into its handle.
///
/// If all workers have reached this point, the final cleanup is performed.
- fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
- let mut workers = self.shutdown_workers.lock();
- workers.push((core, worker));
+ fn shutdown(&self, core: Box<Core>) {
+ let mut cores = self.shutdown_cores.lock();
+ cores.push(core);
- if workers.len() != self.remotes.len() {
+ if cores.len() != self.remotes.len() {
return;
}
- for (mut core, worker) in workers.drain(..) {
- core.shutdown(&worker);
+ for mut core in cores.drain(..) {
+ core.shutdown();
}
// Drain the injection queue