diff options
author | Joel Galenson <jgalenson@google.com> | 2020-10-23 09:39:31 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2020-10-23 09:52:09 -0700 |
commit | d5495b03381a3ebe0805db353d198b285b535b5c (patch) | |
tree | 778b8524d15fca8b73db0253ee0e1919d0848bb6 /src/runtime/blocking | |
parent | ba45c5bedf31df8562364c61d3dfb5262f10642e (diff) | |
download | tokio-d5495b03381a3ebe0805db353d198b285b535b5c.tar.gz |
Update to tokio-0.3.1 and add new features
Test: Build
Change-Id: I5b5b9b386a21982a019653d0cf0bd3afc505cfac
Diffstat (limited to 'src/runtime/blocking')
-rw-r--r-- | src/runtime/blocking/mod.rs | 21 | ||||
-rw-r--r-- | src/runtime/blocking/pool.rs | 72 | ||||
-rw-r--r-- | src/runtime/blocking/shutdown.rs | 11 |
3 files changed, 65 insertions, 39 deletions
diff --git a/src/runtime/blocking/mod.rs b/src/runtime/blocking/mod.rs index 0b36a75..fece3c2 100644 --- a/src/runtime/blocking/mod.rs +++ b/src/runtime/blocking/mod.rs @@ -3,22 +3,20 @@ //! shells. This isolates the complexity of dealing with conditional //! compilation. -cfg_blocking_impl! { - mod pool; - pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner}; +mod pool; +pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; - mod schedule; - mod shutdown; - pub(crate) mod task; +mod schedule; +mod shutdown; +pub(crate) mod task; - use crate::runtime::Builder; - - pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { - BlockingPool::new(builder, thread_cap) +use crate::runtime::Builder; - } +pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { + BlockingPool::new(builder, thread_cap) } +/* cfg_not_blocking_impl! { use crate::runtime::Builder; use std::time::Duration; @@ -41,3 +39,4 @@ cfg_not_blocking_impl! { } } } +*/ diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs index 40d417b..2967a10 100644 --- a/src/runtime/blocking/pool.rs +++ b/src/runtime/blocking/pool.rs @@ -5,9 +5,13 @@ use crate::loom::thread; use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::shutdown; use crate::runtime::blocking::task::BlockingTask; +use crate::runtime::builder::ThreadNameFn; +use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; +use slab::Slab; + use std::collections::VecDeque; use std::fmt; use std::time::Duration; @@ -30,7 +34,7 @@ struct Inner { condvar: Condvar, /// Spawned threads use this name - thread_name: String, + thread_name: ThreadNameFn, /// Spawned thread stack size stack_size: Option<usize>, @@ -41,7 +45,11 @@ struct Inner { /// Call before a thread stops before_stop: Option<Callback>, + // Maximum number of threads thread_cap: usize, + + // Customizable wait timeout + keep_alive: Duration, } struct Shared { @@ -51,6 +59,7 @@ struct Shared { num_notify: u32, shutdown: bool, shutdown_tx: Option<shutdown::Sender>, + worker_threads: Slab<thread::JoinHandle<()>>, } type Task = task::Notified<NoopSchedule>; @@ -62,11 +71,8 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, { - let rt = Handle::current(); - - let (task, handle) = task::joinable(BlockingTask::new(func)); - let _ = rt.blocking_spawner.spawn(task, &rt); - handle + let rt = context::current().expect("not currently running on the Tokio runtime."); + rt.spawn_blocking(func) } #[allow(dead_code)] @@ -74,7 +80,7 @@ pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()> where F: FnOnce() -> R + Send + 'static, { - let rt = Handle::current(); + let rt = context::current().expect("not currently running on the Tokio runtime."); let (task, _handle) = task::joinable(BlockingTask::new(func)); rt.blocking_spawner.spawn(task, &rt) @@ -85,6 +91,7 @@ where impl BlockingPool { pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool { let (shutdown_tx, shutdown_rx) = shutdown::channel(); + let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE); BlockingPool { spawner: Spawner { @@ -96,6 +103,7 @@ impl BlockingPool { num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), + worker_threads: Slab::new(), }), condvar: Condvar::new(), thread_name: builder.thread_name.clone(), @@ -103,6 +111,7 @@ impl BlockingPool { after_start: builder.after_start.clone(), before_stop: builder.before_stop.clone(), thread_cap, + keep_alive, }), }, shutdown_rx, @@ -114,7 +123,7 @@ impl BlockingPool { } pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) { - let mut shared = self.spawner.inner.shared.lock().unwrap(); + let mut shared = self.spawner.inner.shared.lock(); // The function can be called multiple times. First, by explicitly // calling `shutdown` then by the drop handler calling `shutdown`. This @@ -126,10 +135,15 @@ impl BlockingPool { shared.shutdown = true; shared.shutdown_tx = None; self.spawner.inner.condvar.notify_all(); + let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new()); drop(shared); - self.shutdown_rx.wait(timeout); + if self.shutdown_rx.wait(timeout) { + for handle in workers.drain() { + let _ = handle.join(); + } + } } } @@ -150,7 +164,7 @@ impl fmt::Debug for BlockingPool { impl Spawner { pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { let shutdown_tx = { - let mut shared = self.inner.shared.lock().unwrap(); + let mut shared = self.inner.shared.lock(); if shared.shutdown { // Shutdown the task @@ -187,14 +201,24 @@ impl Spawner { }; if let Some(shutdown_tx) = shutdown_tx { - self.spawn_thread(shutdown_tx, rt); + let mut shared = self.inner.shared.lock(); + let entry = shared.worker_threads.vacant_entry(); + + let handle = self.spawn_thread(shutdown_tx, rt, entry.key()); + + entry.insert(handle); } Ok(()) } - fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) { - let mut builder = thread::Builder::new().name(self.inner.thread_name.clone()); + fn spawn_thread( + &self, + shutdown_tx: shutdown::Sender, + rt: &Handle, + worker_id: usize, + ) -> thread::JoinHandle<()> { + let mut builder = thread::Builder::new().name((self.inner.thread_name)()); if let Some(stack_size) = self.inner.stack_size { builder = builder.stack_size(stack_size); @@ -205,23 +229,21 @@ impl Spawner { builder .spawn(move || { // Only the reference should be moved into the closure - let rt = &rt; - rt.enter(move || { - rt.blocking_spawner.inner.run(); - drop(shutdown_tx); - }) + let _enter = crate::runtime::context::enter(rt.clone()); + rt.blocking_spawner.inner.run(worker_id); + drop(shutdown_tx); }) - .unwrap(); + .unwrap() } } impl Inner { - fn run(&self) { + fn run(&self, worker_id: usize) { if let Some(f) = &self.after_start { f() } - let mut shared = self.shared.lock().unwrap(); + let mut shared = self.shared.lock(); 'main: loop { // BUSY @@ -229,14 +251,14 @@ impl Inner { drop(shared); task.run(); - shared = self.shared.lock().unwrap(); + shared = self.shared.lock(); } // IDLE shared.num_idle += 1; while !shared.shutdown { - let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap(); + let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); shared = lock_result.0; let timeout_result = lock_result.1; @@ -252,6 +274,8 @@ impl Inner { // Even if the condvar "timed out", if the pool is entering the // shutdown phase, we want to perform the cleanup logic. if !shared.shutdown && timeout_result.timed_out() { + shared.worker_threads.remove(worker_id); + break 'main; } @@ -264,7 +288,7 @@ impl Inner { drop(shared); task.shutdown(); - shared = self.shared.lock().unwrap(); + shared = self.shared.lock(); } // Work was produced, and we "took" it (by decrementing num_notify). diff --git a/src/runtime/blocking/shutdown.rs b/src/runtime/blocking/shutdown.rs index e76a701..3b6cc59 100644 --- a/src/runtime/blocking/shutdown.rs +++ b/src/runtime/blocking/shutdown.rs @@ -32,11 +32,13 @@ impl Receiver { /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout` /// duration. If `timeout` is `None`, then the thread is blocked until the /// shutdown signal is received. - pub(crate) fn wait(&mut self, timeout: Option<Duration>) { + /// + /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`. + pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool { use crate::runtime::enter::try_enter; if timeout == Some(Duration::from_nanos(0)) { - return; + return true; } let mut e = match try_enter(false) { @@ -44,7 +46,7 @@ impl Receiver { _ => { if std::thread::panicking() { // Don't panic in a panic - return; + return false; } else { panic!( "Cannot drop a runtime in a context where blocking is not allowed. \ @@ -60,9 +62,10 @@ impl Receiver { // current thread (usually, shutting down a runtime stored in a // thread-local). if let Some(timeout) = timeout { - let _ = e.block_on_timeout(&mut self.rx, timeout); + e.block_on_timeout(&mut self.rx, timeout).is_ok() } else { let _ = e.block_on(&mut self.rx); + true } } } |