aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/builder.rs
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-09-30 08:55:02 -0700
committerJoel Galenson <jgalenson@google.com>2021-09-30 08:55:40 -0700
commit5fe87985ba723ee4d9532495587d7114e4b6e143 (patch)
tree71a18fec0599d209bd7c1b95140dc75566fa3788 /src/runtime/builder.rs
parentd61267ffdfea9ed9be38e805f8e3ff78e384005f (diff)
downloadtokio-5fe87985ba723ee4d9532495587d7114e4b6e143.tar.gz
Upgrade rust/crates/tokio to 1.12.0
Test: make Change-Id: I4b0bd405c0b615f886e5a6606e0bf7c0ac7c6699
Diffstat (limited to 'src/runtime/builder.rs')
-rw-r--r--src/runtime/builder.rs131
1 files changed, 128 insertions, 3 deletions
diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs
index 51bf8c8..91c365f 100644
--- a/src/runtime/builder.rs
+++ b/src/runtime/builder.rs
@@ -70,6 +70,12 @@ pub struct Builder {
/// To run before each worker thread stops
pub(super) before_stop: Option<Callback>,
+ /// To run before each worker thread is parked.
+ pub(super) before_park: Option<Callback>,
+
+ /// To run after each thread is unparked.
+ pub(super) after_unpark: Option<Callback>,
+
/// Customizable keep alive timeout for BlockingPool
pub(super) keep_alive: Option<Duration>,
}
@@ -135,6 +141,8 @@ impl Builder {
// No worker thread callbacks
after_start: None,
before_stop: None,
+ before_park: None,
+ after_unpark: None,
keep_alive: None,
}
@@ -374,6 +382,120 @@ impl Builder {
self
}
+ /// Executes function `f` just before a thread is parked (goes idle).
+ /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
+ /// can be called, and may result in this thread being unparked immediately.
+ ///
+ /// This can be used to start work only when the executor is idle, or for bookkeeping
+ /// and monitoring purposes.
+ ///
+ /// Note: There can only be one park callback for a runtime; calling this function
+ /// more than once replaces the last callback defined, rather than adding to it.
+ ///
+ /// # Examples
+ ///
+ /// ## Multithreaded executor
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use std::sync::atomic::{AtomicBool, Ordering};
+ /// # use tokio::runtime;
+ /// # use tokio::sync::Barrier;
+ /// # pub fn main() {
+ /// let once = AtomicBool::new(true);
+ /// let barrier = Arc::new(Barrier::new(2));
+ ///
+ /// let runtime = runtime::Builder::new_multi_thread()
+ /// .worker_threads(1)
+ /// .on_thread_park({
+ /// let barrier = barrier.clone();
+ /// move || {
+ /// let barrier = barrier.clone();
+ /// if once.swap(false, Ordering::Relaxed) {
+ /// tokio::spawn(async move { barrier.wait().await; });
+ /// }
+ /// }
+ /// })
+ /// .build()
+ /// .unwrap();
+ ///
+ /// runtime.block_on(async {
+ /// barrier.wait().await;
+ /// })
+ /// # }
+ /// ```
+ /// ## Current thread executor
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use std::sync::atomic::{AtomicBool, Ordering};
+ /// # use tokio::runtime;
+ /// # use tokio::sync::Barrier;
+ /// # pub fn main() {
+ /// let once = AtomicBool::new(true);
+ /// let barrier = Arc::new(Barrier::new(2));
+ ///
+ /// let runtime = runtime::Builder::new_current_thread()
+ /// .on_thread_park({
+ /// let barrier = barrier.clone();
+ /// move || {
+ /// let barrier = barrier.clone();
+ /// if once.swap(false, Ordering::Relaxed) {
+ /// tokio::spawn(async move { barrier.wait().await; });
+ /// }
+ /// }
+ /// })
+ /// .build()
+ /// .unwrap();
+ ///
+ /// runtime.block_on(async {
+ /// barrier.wait().await;
+ /// })
+ /// # }
+ /// ```
+ #[cfg(not(loom))]
+ pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.before_park = Some(std::sync::Arc::new(f));
+ self
+ }
+
+ /// Executes function `f` just after a thread unparks (starts executing tasks).
+ ///
+ /// This is intended for bookkeeping and monitoring use cases; note that work
+ /// in this callback will increase latencies when the application has allowed one or
+ /// more runtime threads to go idle.
+ ///
+ /// Note: There can only be one unpark callback for a runtime; calling this function
+ /// more than once replaces the last callback defined, rather than adding to it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ ///
+ /// # pub fn main() {
+ /// let runtime = runtime::Builder::new_multi_thread()
+ /// .on_thread_unpark(|| {
+ /// println!("thread unparking");
+ /// })
+ /// .build();
+ ///
+ /// runtime.unwrap().block_on(async {
+ /// tokio::task::yield_now().await;
+ /// println!("Hello from Tokio!");
+ /// })
+ /// # }
+ /// ```
+ #[cfg(not(loom))]
+ pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.after_unpark = Some(std::sync::Arc::new(f));
+ self
+ }
+
/// Creates the configured `Runtime`.
///
/// The returned `Runtime` instance is ready to spawn tasks.
@@ -441,7 +563,8 @@ impl Builder {
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
- let scheduler = BasicScheduler::new(driver);
+ let scheduler =
+ BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone());
let spawner = Spawner::Basic(scheduler.spawner().clone());
// Blocking pool
@@ -546,7 +669,7 @@ cfg_rt_multi_thread! {
let (driver, resources) = driver::Driver::new(self.get_cfg())?;
- let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
+ let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone());
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
// Create the blocking pool
@@ -587,7 +710,9 @@ impl fmt::Debug for Builder {
)
.field("thread_stack_size", &self.thread_stack_size)
.field("after_start", &self.after_start.as_ref().map(|_| "..."))
- .field("before_stop", &self.after_start.as_ref().map(|_| "..."))
+ .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
+ .field("before_park", &self.before_park.as_ref().map(|_| "..."))
+ .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
.finish()
}
}