diff options
author | Joel Galenson <jgalenson@google.com> | 2021-09-30 08:55:02 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-09-30 08:55:40 -0700 |
commit | 5fe87985ba723ee4d9532495587d7114e4b6e143 (patch) | |
tree | 71a18fec0599d209bd7c1b95140dc75566fa3788 /src/runtime/thread_pool | |
parent | d61267ffdfea9ed9be38e805f8e3ff78e384005f (diff) | |
download | tokio-5fe87985ba723ee4d9532495587d7114e4b6e143.tar.gz |
Upgrade rust/crates/tokio to 1.12.0
Test: make
Change-Id: I4b0bd405c0b615f886e5a6606e0bf7c0ac7c6699
Diffstat (limited to 'src/runtime/thread_pool')
-rw-r--r-- | src/runtime/thread_pool/mod.rs | 16 | ||||
-rw-r--r-- | src/runtime/thread_pool/worker.rs | 78 |
2 files changed, 79 insertions, 15 deletions
diff --git a/src/runtime/thread_pool/mod.rs b/src/runtime/thread_pool/mod.rs index 3808aa2..f2e68f6 100644 --- a/src/runtime/thread_pool/mod.rs +++ b/src/runtime/thread_pool/mod.rs @@ -12,8 +12,9 @@ pub(crate) use worker::Launch; pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; +use crate::runtime::stats::RuntimeStats; use crate::runtime::task::JoinHandle; -use crate::runtime::Parker; +use crate::runtime::{Callback, Parker}; use std::fmt; use std::future::Future; @@ -43,8 +44,13 @@ pub(crate) struct Spawner { // ===== impl ThreadPool ===== impl ThreadPool { - pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) { - let (shared, launch) = worker::create(size, parker); + pub(crate) fn new( + size: usize, + parker: Parker, + before_park: Option<Callback>, + after_unpark: Option<Callback>, + ) -> (ThreadPool, Launch) { + let (shared, launch) = worker::create(size, parker, before_park, after_unpark); let spawner = Spawner { shared }; let thread_pool = ThreadPool { spawner }; @@ -99,6 +105,10 @@ impl Spawner { pub(crate) fn shutdown(&mut self) { self.shared.close(); } + + pub(crate) fn stats(&self) -> &RuntimeStats { + self.shared.stats() + } } impl fmt::Debug for Spawner { diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs index f5004c0..44f8db8 100644 --- a/src/runtime/thread_pool/worker.rs +++ b/src/runtime/thread_pool/worker.rs @@ -64,9 +64,10 @@ use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::enter::EnterContext; use crate::runtime::park::{Parker, Unparker}; +use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher}; use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; use crate::runtime::thread_pool::{AtomicCell, Idle}; -use crate::runtime::{queue, task}; +use crate::runtime::{queue, task, Callback}; use crate::util::FastRand; use std::cell::RefCell; @@ -112,6 +113,9 @@ struct Core { /// borrow checker happy. park: Option<Parker>, + /// Batching stats so they can be submitted to RuntimeStats. + stats: WorkerStatsBatcher, + /// Fast random number generator. rand: FastRand, } @@ -137,6 +141,14 @@ pub(super) struct Shared { /// stolen by a thread that was spawned as part of `block_in_place`. #[allow(clippy::vec_box)] // we're moving an already-boxed value shutdown_cores: Mutex<Vec<Box<Core>>>, + + /// Callback for a worker parking itself + before_park: Option<Callback>, + /// Callback for a worker unparking itself + after_unpark: Option<Callback>, + + /// Collect stats from the runtime. + stats: RuntimeStats, } /// Used to communicate with a worker from other threads. @@ -174,12 +186,17 @@ type Notified = task::Notified<Arc<Shared>>; // Tracks thread-local state scoped_thread_local!(static CURRENT: Context); -pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) { +pub(super) fn create( + size: usize, + park: Parker, + before_park: Option<Callback>, + after_unpark: Option<Callback>, +) -> (Arc<Shared>, Launch) { let mut cores = vec![]; let mut remotes = vec![]; // Create the local queues - for _ in 0..size { + for i in 0..size { let (steal, run_queue) = queue::local(); let park = park.clone(); @@ -192,6 +209,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) { is_searching: false, is_shutdown: false, park: Some(park), + stats: WorkerStatsBatcher::new(i), rand: FastRand::new(seed()), })); @@ -204,6 +222,9 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) { idle: Idle::new(size), owned: OwnedTasks::new(), shutdown_cores: Mutex::new(vec![]), + before_park, + after_unpark, + stats: RuntimeStats::new(size), }); let mut launch = Launch(vec![]); @@ -391,6 +412,7 @@ impl Context { core.transition_from_searching(&self.worker); // Make the core available to the runtime context + core.stats.incr_poll_count(); *self.core.borrow_mut() = Some(core); // Run the task @@ -415,6 +437,7 @@ impl Context { if coop::has_budget_remaining() { // Run the LIFO task, then loop + core.stats.incr_poll_count(); *self.core.borrow_mut() = Some(core); let task = self.worker.shared.owned.assert_owner(task); task.run(); @@ -442,19 +465,26 @@ impl Context { } fn park(&self, mut core: Box<Core>) -> Box<Core> { - core.transition_to_parked(&self.worker); + if let Some(f) = &self.worker.shared.before_park { + f(); + } - while !core.is_shutdown { - core = self.park_timeout(core, None); + if core.transition_to_parked(&self.worker) { + while !core.is_shutdown { + core = self.park_timeout(core, None); - // Run regularly scheduled maintenance - core.maintenance(&self.worker); + // Run regularly scheduled maintenance + core.maintenance(&self.worker); - if core.transition_from_parked(&self.worker) { - return core; + if core.transition_from_parked(&self.worker) { + break; + } } } + if let Some(f) = &self.worker.shared.after_unpark { + f(); + } core } @@ -462,6 +492,8 @@ impl Context { // Take the parker out of core let mut park = core.park.take().expect("park missing"); + core.stats.about_to_park(); + // Store `core` in context *self.core.borrow_mut() = Some(core); @@ -483,6 +515,8 @@ impl Context { self.worker.shared.notify_parked(); } + core.stats.returned_from_park(); + core } } @@ -524,7 +558,10 @@ impl Core { } let target = &worker.shared.remotes[i]; - if let Some(task) = target.steal.steal_into(&mut self.run_queue) { + if let Some(task) = target + .steal + .steal_into(&mut self.run_queue, &mut self.stats) + { return Some(task); } } @@ -551,7 +588,14 @@ impl Core { } /// Prepare the worker state for parking - fn transition_to_parked(&mut self, worker: &Worker) { + /// + /// Returns true if the transition happend, false if there is work to do first + fn transition_to_parked(&mut self, worker: &Worker) -> bool { + // Workers should not park if they have work to do + if self.lifo_slot.is_some() || self.run_queue.has_tasks() { + return false; + } + // When the final worker transitions **out** of searching to parked, it // must check all the queues one last time in case work materialized // between the last work scan and transitioning out of searching. @@ -567,6 +611,8 @@ impl Core { if is_last_searcher { worker.shared.notify_if_work_pending(); } + + true } /// Returns `true` if the transition happened. @@ -590,6 +636,8 @@ impl Core { /// Runs maintenance work such as checking the pool's state. fn maintenance(&mut self, worker: &Worker) { + self.stats.submit(&worker.shared.stats); + if !self.is_shutdown { // Check if the scheduler has been shutdown self.is_shutdown = worker.inject().is_closed(); @@ -601,6 +649,8 @@ impl Core { fn pre_shutdown(&mut self, worker: &Worker) { // Signal to all tasks to shut down. worker.shared.owned.close_and_shutdown_all(); + + self.stats.submit(&worker.shared.stats); } /// Shutdown the core @@ -651,6 +701,10 @@ impl Shared { handle } + pub(crate) fn stats(&self) -> &RuntimeStats { + &self.stats + } + pub(super) fn schedule(&self, task: Notified, is_yield: bool) { CURRENT.with(|maybe_cx| { if let Some(cx) = maybe_cx { |