diff options
author | Haibo Huang <hhb@google.com> | 2021-01-14 17:23:22 -0800 |
---|---|---|
committer | Jeff Vander Stoep <jeffv@google.com> | 2021-01-15 20:44:08 +0100 |
commit | 290fc4903cd00fc31d93e0ecd49c402e6833c569 (patch) | |
tree | 4a9646d2ab712bae1ead875992160c7248588daf /src/runtime | |
parent | 84cad6596f48e471881980dcba7df9cb5b4b0139 (diff) | |
download | tokio-290fc4903cd00fc31d93e0ecd49c402e6833c569.tar.gz |
Upgrade rust/crates/tokio to 1.0.2platform-tools-31.0.0
Test: make
Change-Id: Ic48ff709bade266749eac8c146856901ce78da7f
Diffstat (limited to 'src/runtime')
-rw-r--r-- | src/runtime/blocking/shutdown.rs | 2 | ||||
-rw-r--r-- | src/runtime/builder.rs | 39 | ||||
-rw-r--r-- | src/runtime/driver.rs | 9 | ||||
-rw-r--r-- | src/runtime/handle.rs | 4 | ||||
-rw-r--r-- | src/runtime/mod.rs | 4 | ||||
-rw-r--r-- | src/runtime/thread_pool/worker.rs | 137 |
6 files changed, 115 insertions, 80 deletions
diff --git a/src/runtime/blocking/shutdown.rs b/src/runtime/blocking/shutdown.rs index 3b6cc59..0cf2285 100644 --- a/src/runtime/blocking/shutdown.rs +++ b/src/runtime/blocking/shutdown.rs @@ -38,7 +38,7 @@ impl Receiver { use crate::runtime::enter::try_enter; if timeout == Some(Duration::from_nanos(0)) { - return true; + return false; } let mut e = match try_enter(false) { diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs index e792c7d..1f8892e 100644 --- a/src/runtime/builder.rs +++ b/src/runtime/builder.rs @@ -53,7 +53,7 @@ pub struct Builder { worker_threads: Option<usize>, /// Cap on thread usage. - max_threads: usize, + max_blocking_threads: usize, /// Name fn used for threads spawned by the runtime. pub(super) thread_name: ThreadNameFn, @@ -113,7 +113,7 @@ impl Builder { // Default to lazy auto-detection (one thread per CPU core) worker_threads: None, - max_threads: 512, + max_blocking_threads: 512, // Default thread name thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()), @@ -209,22 +209,22 @@ impl Builder { self } - /// Specifies limit for threads, spawned by the Runtime. + /// Specifies limit for threads spawned by the Runtime used for blocking operations. /// - /// This is number of threads to be used by Runtime, including `core_threads` - /// Having `max_threads` less than `worker_threads` results in invalid configuration - /// when building multi-threaded `Runtime`, which would cause a panic. /// - /// Similarly to the `worker_threads`, this number should be between 0 and 32,768. + /// Similarly to the `worker_threads`, this number should be between 1 and 32,768. /// /// The default value is 512. /// - /// When multi-threaded runtime is not used, will act as limit on additional threads. + /// Otherwise as `worker_threads` are always active, it limits additional threads (e.g. for + /// blocking annotations). /// - /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for - /// blocking annotations) as `max_threads - core_threads`. - pub fn max_threads(&mut self, val: usize) -> &mut Self { - self.max_threads = val; + /// # Panic + /// + /// This will panic if `val` is not larger than `0`. + pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self { + assert!(val > 0, "Max blocking threads cannot be set to 0"); + self.max_blocking_threads = val; self } @@ -379,6 +379,11 @@ impl Builder { fn get_cfg(&self) -> driver::Cfg { driver::Cfg { + enable_pause_time: match self.kind { + Kind::CurrentThread => true, + #[cfg(feature = "rt-multi-thread")] + Kind::MultiThread => false, + }, enable_io: self.enable_io, enable_time: self.enable_time, } @@ -419,7 +424,7 @@ impl Builder { let spawner = Spawner::Basic(scheduler.spawner().clone()); // Blocking pool - let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); + let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { @@ -490,10 +495,8 @@ cfg_rt_multi_thread! { use crate::loom::sys::num_cpus; use crate::runtime::{Kind, ThreadPool}; use crate::runtime::park::Parker; - use std::cmp; - let core_threads = self.worker_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus())); - assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit"); + let core_threads = self.worker_threads.unwrap_or_else(num_cpus); let (driver, resources) = driver::Driver::new(self.get_cfg())?; @@ -501,7 +504,7 @@ cfg_rt_multi_thread! { let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the blocking pool - let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); + let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); let blocking_spawner = blocking_pool.spawner().clone(); // Create the runtime handle @@ -531,7 +534,7 @@ impl fmt::Debug for Builder { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Builder") .field("worker_threads", &self.worker_threads) - .field("max_threads", &self.max_threads) + .field("max_blocking_threads", &self.max_blocking_threads) .field( "thread_name", &"<dyn Fn() -> String + Send + Sync + 'static>", diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs index e89de9d..b89fa4f 100644 --- a/src/runtime/driver.rs +++ b/src/runtime/driver.rs @@ -103,8 +103,8 @@ cfg_time! { pub(crate) type Clock = crate::time::Clock; pub(crate) type TimeHandle = Option<crate::time::driver::Handle>; - fn create_clock() -> Clock { - crate::time::Clock::new() + fn create_clock(enable_pausing: bool) -> Clock { + crate::time::Clock::new(enable_pausing) } fn create_time_driver( @@ -131,7 +131,7 @@ cfg_not_time! { pub(crate) type Clock = (); pub(crate) type TimeHandle = (); - fn create_clock() -> Clock { + fn create_clock(_enable_pausing: bool) -> Clock { () } @@ -161,13 +161,14 @@ pub(crate) struct Resources { pub(crate) struct Cfg { pub(crate) enable_io: bool, pub(crate) enable_time: bool, + pub(crate) enable_pause_time: bool, } impl Driver { pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> { let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?; - let clock = create_clock(); + let clock = create_clock(cfg.enable_pause_time); let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, clock.clone()); diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs index 138d13b..6ff3c39 100644 --- a/src/runtime/handle.rs +++ b/src/runtime/handle.rs @@ -142,7 +142,7 @@ impl Handle { F: Future + Send + 'static, F::Output: Send + 'static, { - #[cfg(feature = "tracing")] + #[cfg(all(tokio_unstable, feature = "tracing"))] let future = crate::util::trace::task(future, "task"); self.spawner.spawn(future) } @@ -172,7 +172,7 @@ impl Handle { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - #[cfg(feature = "tracing")] + #[cfg(all(tokio_unstable, feature = "tracing"))] let func = { #[cfg(tokio_track_caller)] let location = std::panic::Location::caller(); diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index d7f068e..2c90acb 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -20,7 +20,7 @@ //! //! ```no_run //! use tokio::net::TcpListener; -//! use tokio::prelude::*; +//! use tokio::io::{AsyncReadExt, AsyncWriteExt}; //! //! #[tokio::main] //! async fn main() -> Result<(), Box<dyn std::error::Error>> { @@ -63,7 +63,7 @@ //! //! ```no_run //! use tokio::net::TcpListener; -//! use tokio::prelude::*; +//! use tokio::io::{AsyncReadExt, AsyncWriteExt}; //! use tokio::runtime::Runtime; //! //! fn main() -> Result<(), Box<dyn std::error::Error>> { diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs index bc544c9..31712e4 100644 --- a/src/runtime/thread_pool/worker.rs +++ b/src/runtime/thread_pool/worker.rs @@ -78,11 +78,12 @@ pub(super) struct Shared { /// Coordinates idle workers idle: Idle, - /// Workers have have observed the shutdown signal + /// Cores that have observed the shutdown signal /// /// The core is **not** placed back in the worker to avoid it from being /// stolen by a thread that was spawned as part of `block_in_place`. - shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>, + #[allow(clippy::vec_box)] // we're moving an already-boxed value + shutdown_cores: Mutex<Vec<Box<Core>>>, } /// Used to communicate with a worker from other threads. @@ -157,7 +158,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) { remotes: remotes.into_boxed_slice(), inject: queue::Inject::new(), idle: Idle::new(size), - shutdown_workers: Mutex::new(vec![]), + shutdown_cores: Mutex::new(vec![]), }); let mut launch = Launch(vec![]); @@ -328,8 +329,10 @@ impl Context { } } + core.pre_shutdown(&self.worker); + // Signal shutdown - self.worker.shared.shutdown(core, self.worker.clone()); + self.worker.shared.shutdown(core); Err(()) } @@ -546,11 +549,9 @@ impl Core { } } - // Shutdown the core - fn shutdown(&mut self, worker: &Worker) { - // Take the core - let mut park = self.park.take().expect("park missing"); - + // Signals all tasks to shut down, and waits for them to complete. Must run + // before we enter the single-threaded phase of shutdown processing. + fn pre_shutdown(&mut self, worker: &Worker) { // Signal to all tasks to shut down. for header in self.tasks.iter() { header.shutdown(); @@ -564,8 +565,17 @@ impl Core { } // Wait until signalled + let park = self.park.as_mut().expect("park missing"); park.park().expect("park failed"); } + } + + // Shutdown the core + fn shutdown(&mut self) { + assert!(self.tasks.is_empty()); + + // Take the core + let mut park = self.park.take().expect("park missing"); // Drain the queue while self.next_local_task().is_some() {} @@ -629,52 +639,73 @@ impl task::Schedule for Arc<Worker> { fn release(&self, task: &Task) -> Option<Task> { use std::ptr::NonNull; - CURRENT.with(|maybe_cx| { - let cx = maybe_cx.expect("scheduler context missing"); + enum Immediate { + // Task has been synchronously removed from the Core owned by the + // current thread + Removed(Option<Task>), + // Task is owned by another thread, so we need to notify it to clean + // up the task later. + MaybeRemote, + } - if self.eq(&cx.worker) { - let mut maybe_core = cx.core.borrow_mut(); + let immediate = CURRENT.with(|maybe_cx| { + let cx = match maybe_cx { + Some(cx) => cx, + None => return Immediate::MaybeRemote, + }; - if let Some(core) = &mut *maybe_core { - // Directly remove the task - // - // safety: the task is inserted in the list in `bind`. - unsafe { - let ptr = NonNull::from(task.header()); - return core.tasks.remove(ptr); - } - } + if !self.eq(&cx.worker) { + // Task owned by another core, so we need to notify it. + return Immediate::MaybeRemote; } - // Track the task to be released by the worker that owns it - // - // Safety: We get a new handle without incrementing the ref-count. - // A ref-count is held by the "owned" linked list and it is only - // ever removed from that list as part of the release process: this - // method or popping the task from `pending_drop`. Thus, we can rely - // on the ref-count held by the linked-list to keep the memory - // alive. - // - // When the task is removed from the stack, it is forgotten instead - // of dropped. - let task = unsafe { Task::from_raw(task.header().into()) }; - - self.remote().pending_drop.push(task); + let mut maybe_core = cx.core.borrow_mut(); - if cx.core.borrow().is_some() { - return None; + if let Some(core) = &mut *maybe_core { + // Directly remove the task + // + // safety: the task is inserted in the list in `bind`. + unsafe { + let ptr = NonNull::from(task.header()); + return Immediate::Removed(core.tasks.remove(ptr)); + } } - // The worker core has been handed off to another thread. In the - // event that the scheduler is currently shutting down, the thread - // that owns the task may be waiting on the release to complete - // shutdown. - if self.inject().is_closed() { - self.remote().unpark.unpark(); - } + Immediate::MaybeRemote + }); - None - }) + // Checks if we were called from within a worker, allowing for immediate + // removal of a scheduled task. Else we have to go through the slower + // process below where we remotely mark a task as dropped. + match immediate { + Immediate::Removed(task) => return task, + Immediate::MaybeRemote => (), + }; + + // Track the task to be released by the worker that owns it + // + // Safety: We get a new handle without incrementing the ref-count. + // A ref-count is held by the "owned" linked list and it is only + // ever removed from that list as part of the release process: this + // method or popping the task from `pending_drop`. Thus, we can rely + // on the ref-count held by the linked-list to keep the memory + // alive. + // + // When the task is removed from the stack, it is forgotten instead + // of dropped. + let task = unsafe { Task::from_raw(task.header().into()) }; + + self.remote().pending_drop.push(task); + + // The worker core has been handed off to another thread. In the + // event that the scheduler is currently shutting down, the thread + // that owns the task may be waiting on the release to complete + // shutdown. + if self.inject().is_closed() { + self.remote().unpark.unpark(); + } + + None } fn schedule(&self, task: Notified) { @@ -779,16 +810,16 @@ impl Shared { /// its core back into its handle. /// /// If all workers have reached this point, the final cleanup is performed. - fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) { - let mut workers = self.shutdown_workers.lock(); - workers.push((core, worker)); + fn shutdown(&self, core: Box<Core>) { + let mut cores = self.shutdown_cores.lock(); + cores.push(core); - if workers.len() != self.remotes.len() { + if cores.len() != self.remotes.len() { return; } - for (mut core, worker) in workers.drain(..) { - core.shutdown(&worker); + for mut core in cores.drain(..) { + core.shutdown(); } // Drain the injection queue |