diff options
author | Joel Galenson <jgalenson@google.com> | 2021-09-30 08:55:02 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-09-30 08:55:40 -0700 |
commit | 5fe87985ba723ee4d9532495587d7114e4b6e143 (patch) | |
tree | 71a18fec0599d209bd7c1b95140dc75566fa3788 /src/runtime/builder.rs | |
parent | d61267ffdfea9ed9be38e805f8e3ff78e384005f (diff) | |
download | tokio-5fe87985ba723ee4d9532495587d7114e4b6e143.tar.gz |
Upgrade rust/crates/tokio to 1.12.0
Test: make
Change-Id: I4b0bd405c0b615f886e5a6606e0bf7c0ac7c6699
Diffstat (limited to 'src/runtime/builder.rs')
-rw-r--r-- | src/runtime/builder.rs | 131 |
1 files changed, 128 insertions, 3 deletions
diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs index 51bf8c8..91c365f 100644 --- a/src/runtime/builder.rs +++ b/src/runtime/builder.rs @@ -70,6 +70,12 @@ pub struct Builder { /// To run before each worker thread stops pub(super) before_stop: Option<Callback>, + /// To run before each worker thread is parked. + pub(super) before_park: Option<Callback>, + + /// To run after each thread is unparked. + pub(super) after_unpark: Option<Callback>, + /// Customizable keep alive timeout for BlockingPool pub(super) keep_alive: Option<Duration>, } @@ -135,6 +141,8 @@ impl Builder { // No worker thread callbacks after_start: None, before_stop: None, + before_park: None, + after_unpark: None, keep_alive: None, } @@ -374,6 +382,120 @@ impl Builder { self } + /// Executes function `f` just before a thread is parked (goes idle). + /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn) + /// can be called, and may result in this thread being unparked immediately. + /// + /// This can be used to start work only when the executor is idle, or for bookkeeping + /// and monitoring purposes. + /// + /// Note: There can only be one park callback for a runtime; calling this function + /// more than once replaces the last callback defined, rather than adding to it. + /// + /// # Examples + /// + /// ## Multithreaded executor + /// ``` + /// # use std::sync::Arc; + /// # use std::sync::atomic::{AtomicBool, Ordering}; + /// # use tokio::runtime; + /// # use tokio::sync::Barrier; + /// # pub fn main() { + /// let once = AtomicBool::new(true); + /// let barrier = Arc::new(Barrier::new(2)); + /// + /// let runtime = runtime::Builder::new_multi_thread() + /// .worker_threads(1) + /// .on_thread_park({ + /// let barrier = barrier.clone(); + /// move || { + /// let barrier = barrier.clone(); + /// if once.swap(false, Ordering::Relaxed) { + /// tokio::spawn(async move { barrier.wait().await; }); + /// } + /// } + /// }) + /// .build() + /// .unwrap(); + /// + /// runtime.block_on(async { + /// barrier.wait().await; + /// }) + /// # } + /// ``` + /// ## Current thread executor + /// ``` + /// # use std::sync::Arc; + /// # use std::sync::atomic::{AtomicBool, Ordering}; + /// # use tokio::runtime; + /// # use tokio::sync::Barrier; + /// # pub fn main() { + /// let once = AtomicBool::new(true); + /// let barrier = Arc::new(Barrier::new(2)); + /// + /// let runtime = runtime::Builder::new_current_thread() + /// .on_thread_park({ + /// let barrier = barrier.clone(); + /// move || { + /// let barrier = barrier.clone(); + /// if once.swap(false, Ordering::Relaxed) { + /// tokio::spawn(async move { barrier.wait().await; }); + /// } + /// } + /// }) + /// .build() + /// .unwrap(); + /// + /// runtime.block_on(async { + /// barrier.wait().await; + /// }) + /// # } + /// ``` + #[cfg(not(loom))] + pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.before_park = Some(std::sync::Arc::new(f)); + self + } + + /// Executes function `f` just after a thread unparks (starts executing tasks). + /// + /// This is intended for bookkeeping and monitoring use cases; note that work + /// in this callback will increase latencies when the application has allowed one or + /// more runtime threads to go idle. + /// + /// Note: There can only be one unpark callback for a runtime; calling this function + /// more than once replaces the last callback defined, rather than adding to it. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let runtime = runtime::Builder::new_multi_thread() + /// .on_thread_unpark(|| { + /// println!("thread unparking"); + /// }) + /// .build(); + /// + /// runtime.unwrap().block_on(async { + /// tokio::task::yield_now().await; + /// println!("Hello from Tokio!"); + /// }) + /// # } + /// ``` + #[cfg(not(loom))] + pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.after_unpark = Some(std::sync::Arc::new(f)); + self + } + /// Creates the configured `Runtime`. /// /// The returned `Runtime` instance is ready to spawn tasks. @@ -441,7 +563,8 @@ impl Builder { // there are no futures ready to do something, it'll let the timer or // the reactor to generate some new stimuli for the futures to continue // in their life. - let scheduler = BasicScheduler::new(driver); + let scheduler = + BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone()); let spawner = Spawner::Basic(scheduler.spawner().clone()); // Blocking pool @@ -546,7 +669,7 @@ cfg_rt_multi_thread! { let (driver, resources) = driver::Driver::new(self.get_cfg())?; - let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); + let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone()); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the blocking pool @@ -587,7 +710,9 @@ impl fmt::Debug for Builder { ) .field("thread_stack_size", &self.thread_stack_size) .field("after_start", &self.after_start.as_ref().map(|_| "...")) - .field("before_stop", &self.after_start.as_ref().map(|_| "...")) + .field("before_stop", &self.before_stop.as_ref().map(|_| "...")) + .field("before_park", &self.before_park.as_ref().map(|_| "...")) + .field("after_unpark", &self.after_unpark.as_ref().map(|_| "...")) .finish() } } |