diff options
Diffstat (limited to 'src/runtime/blocking/pool.rs')
-rw-r--r-- | src/runtime/blocking/pool.rs | 72 |
1 files changed, 48 insertions, 24 deletions
diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs index 40d417b..2967a10 100644 --- a/src/runtime/blocking/pool.rs +++ b/src/runtime/blocking/pool.rs @@ -5,9 +5,13 @@ use crate::loom::thread; use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::shutdown; use crate::runtime::blocking::task::BlockingTask; +use crate::runtime::builder::ThreadNameFn; +use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; +use slab::Slab; + use std::collections::VecDeque; use std::fmt; use std::time::Duration; @@ -30,7 +34,7 @@ struct Inner { condvar: Condvar, /// Spawned threads use this name - thread_name: String, + thread_name: ThreadNameFn, /// Spawned thread stack size stack_size: Option<usize>, @@ -41,7 +45,11 @@ struct Inner { /// Call before a thread stops before_stop: Option<Callback>, + // Maximum number of threads thread_cap: usize, + + // Customizable wait timeout + keep_alive: Duration, } struct Shared { @@ -51,6 +59,7 @@ struct Shared { num_notify: u32, shutdown: bool, shutdown_tx: Option<shutdown::Sender>, + worker_threads: Slab<thread::JoinHandle<()>>, } type Task = task::Notified<NoopSchedule>; @@ -62,11 +71,8 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, { - let rt = Handle::current(); - - let (task, handle) = task::joinable(BlockingTask::new(func)); - let _ = rt.blocking_spawner.spawn(task, &rt); - handle + let rt = context::current().expect("not currently running on the Tokio runtime."); + rt.spawn_blocking(func) } #[allow(dead_code)] @@ -74,7 +80,7 @@ pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()> where F: FnOnce() -> R + Send + 'static, { - let rt = Handle::current(); + let rt = context::current().expect("not currently running on the Tokio runtime."); let (task, _handle) = task::joinable(BlockingTask::new(func)); rt.blocking_spawner.spawn(task, &rt) @@ -85,6 +91,7 @@ where impl BlockingPool { pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool { let (shutdown_tx, shutdown_rx) = shutdown::channel(); + let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE); BlockingPool { spawner: Spawner { @@ -96,6 +103,7 @@ impl BlockingPool { num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), + worker_threads: Slab::new(), }), condvar: Condvar::new(), thread_name: builder.thread_name.clone(), @@ -103,6 +111,7 @@ impl BlockingPool { after_start: builder.after_start.clone(), before_stop: builder.before_stop.clone(), thread_cap, + keep_alive, }), }, shutdown_rx, @@ -114,7 +123,7 @@ impl BlockingPool { } pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) { - let mut shared = self.spawner.inner.shared.lock().unwrap(); + let mut shared = self.spawner.inner.shared.lock(); // The function can be called multiple times. First, by explicitly // calling `shutdown` then by the drop handler calling `shutdown`. This @@ -126,10 +135,15 @@ impl BlockingPool { shared.shutdown = true; shared.shutdown_tx = None; self.spawner.inner.condvar.notify_all(); + let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new()); drop(shared); - self.shutdown_rx.wait(timeout); + if self.shutdown_rx.wait(timeout) { + for handle in workers.drain() { + let _ = handle.join(); + } + } } } @@ -150,7 +164,7 @@ impl fmt::Debug for BlockingPool { impl Spawner { pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { let shutdown_tx = { - let mut shared = self.inner.shared.lock().unwrap(); + let mut shared = self.inner.shared.lock(); if shared.shutdown { // Shutdown the task @@ -187,14 +201,24 @@ impl Spawner { }; if let Some(shutdown_tx) = shutdown_tx { - self.spawn_thread(shutdown_tx, rt); + let mut shared = self.inner.shared.lock(); + let entry = shared.worker_threads.vacant_entry(); + + let handle = self.spawn_thread(shutdown_tx, rt, entry.key()); + + entry.insert(handle); } Ok(()) } - fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) { - let mut builder = thread::Builder::new().name(self.inner.thread_name.clone()); + fn spawn_thread( + &self, + shutdown_tx: shutdown::Sender, + rt: &Handle, + worker_id: usize, + ) -> thread::JoinHandle<()> { + let mut builder = thread::Builder::new().name((self.inner.thread_name)()); if let Some(stack_size) = self.inner.stack_size { builder = builder.stack_size(stack_size); @@ -205,23 +229,21 @@ impl Spawner { builder .spawn(move || { // Only the reference should be moved into the closure - let rt = &rt; - rt.enter(move || { - rt.blocking_spawner.inner.run(); - drop(shutdown_tx); - }) + let _enter = crate::runtime::context::enter(rt.clone()); + rt.blocking_spawner.inner.run(worker_id); + drop(shutdown_tx); }) - .unwrap(); + .unwrap() } } impl Inner { - fn run(&self) { + fn run(&self, worker_id: usize) { if let Some(f) = &self.after_start { f() } - let mut shared = self.shared.lock().unwrap(); + let mut shared = self.shared.lock(); 'main: loop { // BUSY @@ -229,14 +251,14 @@ impl Inner { drop(shared); task.run(); - shared = self.shared.lock().unwrap(); + shared = self.shared.lock(); } // IDLE shared.num_idle += 1; while !shared.shutdown { - let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap(); + let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); shared = lock_result.0; let timeout_result = lock_result.1; @@ -252,6 +274,8 @@ impl Inner { // Even if the condvar "timed out", if the pool is entering the // shutdown phase, we want to perform the cleanup logic. if !shared.shutdown && timeout_result.timed_out() { + shared.worker_threads.remove(worker_id); + break 'main; } @@ -264,7 +288,7 @@ impl Inner { drop(shared); task.shutdown(); - shared = self.shared.lock().unwrap(); + shared = self.shared.lock(); } // Work was produced, and we "took" it (by decrementing num_notify). |