aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/thread_pool
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-09-30 08:55:02 -0700
committerJoel Galenson <jgalenson@google.com>2021-09-30 08:55:40 -0700
commit5fe87985ba723ee4d9532495587d7114e4b6e143 (patch)
tree71a18fec0599d209bd7c1b95140dc75566fa3788 /src/runtime/thread_pool
parentd61267ffdfea9ed9be38e805f8e3ff78e384005f (diff)
downloadtokio-5fe87985ba723ee4d9532495587d7114e4b6e143.tar.gz
Upgrade rust/crates/tokio to 1.12.0
Test: make Change-Id: I4b0bd405c0b615f886e5a6606e0bf7c0ac7c6699
Diffstat (limited to 'src/runtime/thread_pool')
-rw-r--r--src/runtime/thread_pool/mod.rs16
-rw-r--r--src/runtime/thread_pool/worker.rs78
2 files changed, 79 insertions, 15 deletions
diff --git a/src/runtime/thread_pool/mod.rs b/src/runtime/thread_pool/mod.rs
index 3808aa2..f2e68f6 100644
--- a/src/runtime/thread_pool/mod.rs
+++ b/src/runtime/thread_pool/mod.rs
@@ -12,8 +12,9 @@ pub(crate) use worker::Launch;
pub(crate) use worker::block_in_place;
use crate::loom::sync::Arc;
+use crate::runtime::stats::RuntimeStats;
use crate::runtime::task::JoinHandle;
-use crate::runtime::Parker;
+use crate::runtime::{Callback, Parker};
use std::fmt;
use std::future::Future;
@@ -43,8 +44,13 @@ pub(crate) struct Spawner {
// ===== impl ThreadPool =====
impl ThreadPool {
- pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) {
- let (shared, launch) = worker::create(size, parker);
+ pub(crate) fn new(
+ size: usize,
+ parker: Parker,
+ before_park: Option<Callback>,
+ after_unpark: Option<Callback>,
+ ) -> (ThreadPool, Launch) {
+ let (shared, launch) = worker::create(size, parker, before_park, after_unpark);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };
@@ -99,6 +105,10 @@ impl Spawner {
pub(crate) fn shutdown(&mut self) {
self.shared.close();
}
+
+ pub(crate) fn stats(&self) -> &RuntimeStats {
+ self.shared.stats()
+ }
}
impl fmt::Debug for Spawner {
diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs
index f5004c0..44f8db8 100644
--- a/src/runtime/thread_pool/worker.rs
+++ b/src/runtime/thread_pool/worker.rs
@@ -64,9 +64,10 @@ use crate::park::{Park, Unpark};
use crate::runtime;
use crate::runtime::enter::EnterContext;
use crate::runtime::park::{Parker, Unparker};
+use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::thread_pool::{AtomicCell, Idle};
-use crate::runtime::{queue, task};
+use crate::runtime::{queue, task, Callback};
use crate::util::FastRand;
use std::cell::RefCell;
@@ -112,6 +113,9 @@ struct Core {
/// borrow checker happy.
park: Option<Parker>,
+ /// Batching stats so they can be submitted to RuntimeStats.
+ stats: WorkerStatsBatcher,
+
/// Fast random number generator.
rand: FastRand,
}
@@ -137,6 +141,14 @@ pub(super) struct Shared {
/// stolen by a thread that was spawned as part of `block_in_place`.
#[allow(clippy::vec_box)] // we're moving an already-boxed value
shutdown_cores: Mutex<Vec<Box<Core>>>,
+
+ /// Callback for a worker parking itself
+ before_park: Option<Callback>,
+ /// Callback for a worker unparking itself
+ after_unpark: Option<Callback>,
+
+ /// Collect stats from the runtime.
+ stats: RuntimeStats,
}
/// Used to communicate with a worker from other threads.
@@ -174,12 +186,17 @@ type Notified = task::Notified<Arc<Shared>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);
-pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
+pub(super) fn create(
+ size: usize,
+ park: Parker,
+ before_park: Option<Callback>,
+ after_unpark: Option<Callback>,
+) -> (Arc<Shared>, Launch) {
let mut cores = vec![];
let mut remotes = vec![];
// Create the local queues
- for _ in 0..size {
+ for i in 0..size {
let (steal, run_queue) = queue::local();
let park = park.clone();
@@ -192,6 +209,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
is_searching: false,
is_shutdown: false,
park: Some(park),
+ stats: WorkerStatsBatcher::new(i),
rand: FastRand::new(seed()),
}));
@@ -204,6 +222,9 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
+ before_park,
+ after_unpark,
+ stats: RuntimeStats::new(size),
});
let mut launch = Launch(vec![]);
@@ -391,6 +412,7 @@ impl Context {
core.transition_from_searching(&self.worker);
// Make the core available to the runtime context
+ core.stats.incr_poll_count();
*self.core.borrow_mut() = Some(core);
// Run the task
@@ -415,6 +437,7 @@ impl Context {
if coop::has_budget_remaining() {
// Run the LIFO task, then loop
+ core.stats.incr_poll_count();
*self.core.borrow_mut() = Some(core);
let task = self.worker.shared.owned.assert_owner(task);
task.run();
@@ -442,19 +465,26 @@ impl Context {
}
fn park(&self, mut core: Box<Core>) -> Box<Core> {
- core.transition_to_parked(&self.worker);
+ if let Some(f) = &self.worker.shared.before_park {
+ f();
+ }
- while !core.is_shutdown {
- core = self.park_timeout(core, None);
+ if core.transition_to_parked(&self.worker) {
+ while !core.is_shutdown {
+ core = self.park_timeout(core, None);
- // Run regularly scheduled maintenance
- core.maintenance(&self.worker);
+ // Run regularly scheduled maintenance
+ core.maintenance(&self.worker);
- if core.transition_from_parked(&self.worker) {
- return core;
+ if core.transition_from_parked(&self.worker) {
+ break;
+ }
}
}
+ if let Some(f) = &self.worker.shared.after_unpark {
+ f();
+ }
core
}
@@ -462,6 +492,8 @@ impl Context {
// Take the parker out of core
let mut park = core.park.take().expect("park missing");
+ core.stats.about_to_park();
+
// Store `core` in context
*self.core.borrow_mut() = Some(core);
@@ -483,6 +515,8 @@ impl Context {
self.worker.shared.notify_parked();
}
+ core.stats.returned_from_park();
+
core
}
}
@@ -524,7 +558,10 @@ impl Core {
}
let target = &worker.shared.remotes[i];
- if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
+ if let Some(task) = target
+ .steal
+ .steal_into(&mut self.run_queue, &mut self.stats)
+ {
return Some(task);
}
}
@@ -551,7 +588,14 @@ impl Core {
}
/// Prepare the worker state for parking
- fn transition_to_parked(&mut self, worker: &Worker) {
+ ///
+ /// Returns true if the transition happend, false if there is work to do first
+ fn transition_to_parked(&mut self, worker: &Worker) -> bool {
+ // Workers should not park if they have work to do
+ if self.lifo_slot.is_some() || self.run_queue.has_tasks() {
+ return false;
+ }
+
// When the final worker transitions **out** of searching to parked, it
// must check all the queues one last time in case work materialized
// between the last work scan and transitioning out of searching.
@@ -567,6 +611,8 @@ impl Core {
if is_last_searcher {
worker.shared.notify_if_work_pending();
}
+
+ true
}
/// Returns `true` if the transition happened.
@@ -590,6 +636,8 @@ impl Core {
/// Runs maintenance work such as checking the pool's state.
fn maintenance(&mut self, worker: &Worker) {
+ self.stats.submit(&worker.shared.stats);
+
if !self.is_shutdown {
// Check if the scheduler has been shutdown
self.is_shutdown = worker.inject().is_closed();
@@ -601,6 +649,8 @@ impl Core {
fn pre_shutdown(&mut self, worker: &Worker) {
// Signal to all tasks to shut down.
worker.shared.owned.close_and_shutdown_all();
+
+ self.stats.submit(&worker.shared.stats);
}
/// Shutdown the core
@@ -651,6 +701,10 @@ impl Shared {
handle
}
+ pub(crate) fn stats(&self) -> &RuntimeStats {
+ &self.stats
+ }
+
pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
CURRENT.with(|maybe_cx| {
if let Some(cx) = maybe_cx {