diff options
Diffstat (limited to 'src/runtime/mod.rs')
-rw-r--r-- | src/runtime/mod.rs | 733 |
1 files changed, 354 insertions, 379 deletions
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 300a146..be4aa38 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,8 +1,7 @@ //! The Tokio runtime. //! -//! Unlike other Rust programs, asynchronous applications require -//! runtime support. In particular, the following runtime services are -//! necessary: +//! Unlike other Rust programs, asynchronous applications require runtime +//! support. In particular, the following runtime services are necessary: //! //! * An **I/O event loop**, called the driver, which drives I/O resources and //! dispatches I/O events to tasks that depend on them. @@ -10,14 +9,14 @@ //! * A **timer** for scheduling work to run after a set period of time. //! //! Tokio's [`Runtime`] bundles all of these services as a single type, allowing -//! them to be started, shut down, and configured together. However, most -//! applications won't need to use [`Runtime`] directly. Instead, they can -//! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under -//! the hood. +//! them to be started, shut down, and configured together. However, often it is +//! not required to configure a [`Runtime`] manually, and user may just use the +//! [`tokio::main`] attribute macro, which creates a [`Runtime`] under the hood. //! //! # Usage //! -//! Most applications will use the [`tokio::main`] attribute macro. +//! When no fine tuning is required, the [`tokio::main`] attribute macro can be +//! used. //! //! ```no_run //! use tokio::net::TcpListener; @@ -25,7 +24,7 @@ //! //! #[tokio::main] //! async fn main() -> Result<(), Box<dyn std::error::Error>> { -//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +//! let listener = TcpListener::bind("127.0.0.1:8080").await?; //! //! loop { //! let (mut socket, _) = listener.accept().await?; @@ -69,11 +68,11 @@ //! //! fn main() -> Result<(), Box<dyn std::error::Error>> { //! // Create the runtime -//! let mut rt = Runtime::new()?; +//! let rt = Runtime::new()?; //! //! // Spawn the root task //! rt.block_on(async { -//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +//! let listener = TcpListener::bind("127.0.0.1:8080").await?; //! //! loop { //! let (mut socket, _) = listener.accept().await?; @@ -111,48 +110,38 @@ //! applications. The [runtime builder] or `#[tokio::main]` attribute may be //! used to select which scheduler to use. //! -//! #### Basic Scheduler +//! #### Multi-Thread Scheduler //! -//! The basic scheduler provides a _single-threaded_ future executor. All tasks -//! will be created and executed on the current thread. The basic scheduler -//! requires the `rt-core` feature flag, and can be selected using the -//! [`Builder::basic_scheduler`] method: +//! The multi-thread scheduler executes futures on a _thread pool_, using a +//! work-stealing strategy. By default, it will start a worker thread for each +//! CPU core available on the system. This tends to be the ideal configurations +//! for most applications. The multi-thread scheduler requires the `rt-multi-thread` +//! feature flag, and is selected by default: //! ``` //! use tokio::runtime; //! //! # fn main() -> Result<(), Box<dyn std::error::Error>> { -//! let basic_rt = runtime::Builder::new() -//! .basic_scheduler() -//! .build()?; +//! let threaded_rt = runtime::Runtime::new()?; //! # Ok(()) } //! ``` //! -//! If the `rt-core` feature is enabled and `rt-threaded` is not, -//! [`Runtime::new`] will return a basic scheduler runtime by default. +//! Most applications should use the multi-thread scheduler, except in some +//! niche use-cases, such as when running only a single thread is required. //! -//! #### Threaded Scheduler +//! #### Current-Thread Scheduler //! -//! The threaded scheduler executes futures on a _thread pool_, using a -//! work-stealing strategy. By default, it will start a worker thread for each -//! CPU core available on the system. This tends to be the ideal configurations -//! for most applications. The threaded scheduler requires the `rt-threaded` feature -//! flag, and can be selected using the [`Builder::threaded_scheduler`] method: +//! The current-thread scheduler provides a _single-threaded_ future executor. +//! All tasks will be created and executed on the current thread. This requires +//! the `rt` feature flag. //! ``` //! use tokio::runtime; //! //! # fn main() -> Result<(), Box<dyn std::error::Error>> { -//! let threaded_rt = runtime::Builder::new() -//! .threaded_scheduler() +//! let basic_rt = runtime::Builder::new_current_thread() //! .build()?; //! # Ok(()) } //! ``` //! -//! If the `rt-threaded` feature flag is enabled, [`Runtime::new`] will return a -//! threaded scheduler runtime by default. -//! -//! Most applications should use the threaded scheduler, except in some niche -//! use-cases, such as when running only a single thread is required. -//! //! #### Resource drivers //! //! When configuring a runtime by hand, no resource drivers are enabled by @@ -164,8 +153,8 @@ //! ## Lifetime of spawned threads //! //! The runtime may spawn threads depending on its configuration and usage. The -//! threaded scheduler spawns threads to schedule tasks and calls to -//! `spawn_blocking` spawn threads to run blocking operations. +//! multi-thread scheduler spawns threads to schedule tasks and for `spawn_blocking` +//! calls. //! //! While the `Runtime` is active, threads may shutdown after periods of being //! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown. @@ -188,394 +177,380 @@ #[macro_use] mod tests; -pub(crate) mod context; +pub(crate) mod enter; + +pub(crate) mod task; -cfg_rt_core! { +cfg_rt! { mod basic_scheduler; use basic_scheduler::BasicScheduler; - pub(crate) mod task; -} - -mod blocking; -use blocking::BlockingPool; + mod blocking; + use blocking::BlockingPool; + pub(crate) use blocking::spawn_blocking; -cfg_blocking_impl! { - #[allow(unused_imports)] - pub(crate) use blocking::{spawn_blocking, try_spawn_blocking}; -} + mod builder; + pub use self::builder::Builder; -mod builder; -pub use self::builder::Builder; + pub(crate) mod context; + pub(crate) mod driver; -pub(crate) mod enter; -use self::enter::enter; + use self::enter::enter; -mod handle; -pub use self::handle::{Handle, TryCurrentError}; + mod handle; + use handle::Handle; -mod io; + mod spawner; + use self::spawner::Spawner; +} -cfg_rt_threaded! { +cfg_rt_multi_thread! { mod park; use park::Parker; } -mod shell; -use self::shell::Shell; - -mod spawner; -use self::spawner::Spawner; - -mod time; - -cfg_rt_threaded! { +cfg_rt_multi_thread! { mod queue; pub(crate) mod thread_pool; use self::thread_pool::ThreadPool; } -cfg_rt_core! { +cfg_rt! { use crate::task::JoinHandle; -} - -use std::future::Future; -use std::time::Duration; - -/// The Tokio runtime. -/// -/// The runtime provides an I/O driver, task scheduler, [timer], and blocking -/// pool, necessary for running asynchronous tasks. -/// -/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, -/// most users will use the `#[tokio::main]` annotation on their entry point instead. -/// -/// See [module level][mod] documentation for more details. -/// -/// # Shutdown -/// -/// Shutting down the runtime is done by dropping the value. The current thread -/// will block until the shut down operation has completed. -/// -/// * Drain any scheduled work queues. -/// * Drop any futures that have not yet completed. -/// * Drop the reactor. -/// -/// Once the reactor has dropped, any outstanding I/O resources bound to -/// that reactor will no longer function. Calling any method on them will -/// result in an error. -/// -/// [timer]: crate::time -/// [mod]: index.html -/// [`new`]: method@Self::new -/// [`Builder`]: struct@Builder -/// [`tokio::run`]: fn@run -#[derive(Debug)] -pub struct Runtime { - /// Task executor - kind: Kind, - - /// Handle to runtime, also contains driver handles - handle: Handle, - - /// Blocking pool handle, used to signal shutdown - blocking_pool: BlockingPool, -} - -/// The runtime executor is either a thread-pool or a current-thread executor. -#[derive(Debug)] -enum Kind { - /// Not able to execute concurrent tasks. This variant is mostly used to get - /// access to the driver handles. - Shell(Shell), - /// Execute all tasks on the current-thread. - #[cfg(feature = "rt-core")] - Basic(BasicScheduler<time::Driver>), + use std::future::Future; + use std::time::Duration; - /// Execute tasks across multiple threads. - #[cfg(feature = "rt-threaded")] - ThreadPool(ThreadPool), -} - -/// After thread starts / before thread stops -type Callback = std::sync::Arc<dyn Fn() + Send + Sync>; - -impl Runtime { - /// Create a new runtime instance with default configuration values. + /// The Tokio runtime. /// - /// This results in a scheduler, I/O driver, and time driver being - /// initialized. The type of scheduler used depends on what feature flags - /// are enabled: if the `rt-threaded` feature is enabled, the [threaded - /// scheduler] is used, while if only the `rt-core` feature is enabled, the - /// [basic scheduler] is used instead. + /// The runtime provides an I/O driver, task scheduler, [timer], and + /// blocking pool, necessary for running asynchronous tasks. /// - /// If the threaded scheduler is selected, it will not spawn - /// any worker threads until it needs to, i.e. tasks are scheduled to run. - /// - /// Most applications will not need to call this function directly. Instead, - /// they will use the [`#[tokio::main]` attribute][main]. When more complex - /// configuration is necessary, the [runtime builder] may be used. + /// Instances of `Runtime` can be created using [`new`], or [`Builder`]. + /// However, most users will use the `#[tokio::main]` annotation on their + /// entry point instead. /// /// See [module level][mod] documentation for more details. /// - /// # Examples - /// - /// Creating a new `Runtime` with default configuration values. + /// # Shutdown /// - /// ``` - /// use tokio::runtime::Runtime; + /// Shutting down the runtime is done by dropping the value. The current + /// thread will block until the shut down operation has completed. /// - /// let rt = Runtime::new() - /// .unwrap(); + /// * Drain any scheduled work queues. + /// * Drop any futures that have not yet completed. + /// * Drop the reactor. /// - /// // Use the runtime... - /// ``` + /// Once the reactor has dropped, any outstanding I/O resources bound to + /// that reactor will no longer function. Calling any method on them will + /// result in an error. /// - /// [mod]: index.html - /// [main]: ../attr.main.html - /// [threaded scheduler]: index.html#threaded-scheduler - /// [basic scheduler]: index.html#basic-scheduler - /// [runtime builder]: crate::runtime::Builder - pub fn new() -> io::Result<Runtime> { - #[cfg(feature = "rt-threaded")] - let ret = Builder::new().threaded_scheduler().enable_all().build(); - - #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))] - let ret = Builder::new().basic_scheduler().enable_all().build(); - - #[cfg(not(feature = "rt-core"))] - let ret = Builder::new().enable_all().build(); - - ret - } - - /// Spawn a future onto the Tokio runtime. + /// # Sharing /// - /// This spawns the given future onto the runtime's executor, usually a - /// thread pool. The thread pool is then responsible for polling the future - /// until it completes. + /// The Tokio runtime implements `Sync` and `Send` to allow you to wrap it + /// in a `Arc`. Most fn take `&self` to allow you to call them concurrently + /// accross multiple threads. /// - /// See [module level][mod] documentation for more details. + /// Calls to `shutdown` and `shutdown_timeout` require exclusive ownership of + /// the runtime type and this can be achieved via `Arc::try_unwrap` when only + /// one strong count reference is left over. /// + /// [timer]: crate::time /// [mod]: index.html - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// - /// // Spawn a future onto the runtime - /// rt.spawn(async { - /// println!("now running on a worker thread"); - /// }); - /// # } - /// ``` - /// - /// # Panics - /// - /// This function will not panic unless task execution is disabled on the - /// executor. This can only happen if the runtime was built using - /// [`Builder`] without picking either [`basic_scheduler`] or - /// [`threaded_scheduler`]. - /// + /// [`new`]: method@Self::new /// [`Builder`]: struct@Builder - /// [`threaded_scheduler`]: fn@Builder::threaded_scheduler - /// [`basic_scheduler`]: fn@Builder::basic_scheduler - #[cfg(feature = "rt-core")] - pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - match &self.kind { - Kind::Shell(_) => panic!("task execution disabled"), - #[cfg(feature = "rt-threaded")] - Kind::ThreadPool(exec) => exec.spawn(future), - Kind::Basic(exec) => exec.spawn(future), - } - } + #[derive(Debug)] + pub struct Runtime { + /// Task executor + kind: Kind, - /// Run a future to completion on the Tokio runtime. This is the runtime's - /// entry point. - /// - /// This runs the given future on the runtime, blocking until it is - /// complete, and yielding its resolved result. Any tasks or timers which - /// the future spawns internally will be executed on the runtime. - /// - /// `&mut` is required as calling `block_on` **may** result in advancing the - /// state of the runtime. The details depend on how the runtime is - /// configured. [`runtime::Handle::block_on`][handle] provides a version - /// that takes `&self`. - /// - /// This method may not be called from an asynchronous context. - /// - /// # Panics - /// - /// This function panics if the provided future panics, or if called within an - /// asynchronous execution context. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::runtime::Runtime; - /// - /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); - /// - /// // Execute the future, blocking the current thread until completion - /// rt.block_on(async { - /// println!("hello"); - /// }); - /// ``` - /// - /// [handle]: fn@Handle::block_on - pub fn block_on<F: Future>(&mut self, future: F) -> F::Output { - let kind = &mut self.kind; + /// Handle to runtime, also contains driver handles + handle: Handle, - self.handle.enter(|| match kind { - Kind::Shell(exec) => exec.block_on(future), - #[cfg(feature = "rt-core")] - Kind::Basic(exec) => exec.block_on(future), - #[cfg(feature = "rt-threaded")] - Kind::ThreadPool(exec) => exec.block_on(future), - }) + /// Blocking pool handle, used to signal shutdown + blocking_pool: BlockingPool, } - /// Enter the runtime context. This allows you to construct types that must - /// have an executor available on creation such as [`Delay`] or [`TcpStream`]. - /// It will also allow you to call methods such as [`tokio::spawn`]. - /// - /// This function is also available as [`Handle::enter`]. - /// - /// [`Delay`]: struct@crate::time::Delay - /// [`TcpStream`]: struct@crate::net::TcpStream - /// [`Handle::enter`]: fn@crate::runtime::Handle::enter - /// [`tokio::spawn`]: fn@crate::spawn - /// - /// # Example - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// fn function_that_spawns(msg: String) { - /// // Had we not used `rt.enter` below, this would panic. - /// tokio::spawn(async move { - /// println!("{}", msg); - /// }); - /// } + /// Runtime context guard. /// - /// fn main() { - /// let rt = Runtime::new().unwrap(); - /// - /// let s = "Hello World!".to_string(); - /// - /// // By entering the context, we tie `tokio::spawn` to this executor. - /// rt.enter(|| function_that_spawns(s)); - /// } - /// ``` - pub fn enter<F, R>(&self, f: F) -> R - where - F: FnOnce() -> R, - { - self.handle.enter(f) + /// Returned by [`Runtime::enter`], the context guard exits the runtime + /// context on drop. + #[derive(Debug)] + pub struct EnterGuard<'a> { + rt: &'a Runtime, + guard: context::EnterGuard, } - /// Return a handle to the runtime's spawner. - /// - /// The returned handle can be used to spawn tasks that run on this runtime, and can - /// be cloned to allow moving the `Handle` to other threads. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// let handle = rt.handle(); - /// - /// handle.spawn(async { println!("hello"); }); - /// ``` - pub fn handle(&self) -> &Handle { - &self.handle - } + /// The runtime executor is either a thread-pool or a current-thread executor. + #[derive(Debug)] + enum Kind { + /// Execute all tasks on the current-thread. + CurrentThread(BasicScheduler<driver::Driver>), - /// Shutdown the runtime, waiting for at most `duration` for all spawned - /// task to shutdown. - /// - /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to - /// shutdown in a timely fashion. However, dropping a `Runtime` will wait - /// indefinitely for all tasks to terminate, and there are cases where a long - /// blocking task has been spawned, which can block dropping `Runtime`. - /// - /// In this case, calling `shutdown_timeout` with an explicit wait timeout - /// can work. The `shutdown_timeout` will signal all tasks to shutdown and - /// will wait for at most `duration` for all spawned tasks to terminate. If - /// `timeout` elapses before all tasks are dropped, the function returns and - /// outstanding tasks are potentially leaked. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// use tokio::task; - /// - /// use std::thread; - /// use std::time::Duration; - /// - /// fn main() { - /// let mut runtime = Runtime::new().unwrap(); - /// - /// runtime.block_on(async move { - /// task::spawn_blocking(move || { - /// thread::sleep(Duration::from_secs(10_000)); - /// }); - /// }); - /// - /// runtime.shutdown_timeout(Duration::from_millis(100)); - /// } - /// ``` - pub fn shutdown_timeout(self, duration: Duration) { - let Runtime { - mut blocking_pool, .. - } = self; - blocking_pool.shutdown(Some(duration)); + /// Execute tasks across multiple threads. + #[cfg(feature = "rt-multi-thread")] + ThreadPool(ThreadPool), } - /// Shutdown the runtime, without waiting for any spawned tasks to shutdown. - /// - /// This can be useful if you want to drop a runtime from within another runtime. - /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks - /// to complete, which would normally not be permitted within an asynchronous context. - /// By calling `shutdown_background()`, you can drop the runtime from such a context. - /// - /// Note however, that because we do not wait for any blocking tasks to complete, this - /// may result in a resource leak (in that any blocking tasks are still running until they - /// return. - /// - /// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`. - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// fn main() { - /// let mut runtime = Runtime::new().unwrap(); - /// - /// runtime.block_on(async move { - /// let inner_runtime = Runtime::new().unwrap(); - /// // ... - /// inner_runtime.shutdown_background(); - /// }); - /// } - /// ``` - pub fn shutdown_background(self) { - self.shutdown_timeout(Duration::from_nanos(0)) + /// After thread starts / before thread stops + type Callback = std::sync::Arc<dyn Fn() + Send + Sync>; + + impl Runtime { + /// Create a new runtime instance with default configuration values. + /// + /// This results in the multi threaded scheduler, I/O driver, and time driver being + /// initialized. + /// + /// Most applications will not need to call this function directly. Instead, + /// they will use the [`#[tokio::main]` attribute][main]. When a more complex + /// configuration is necessary, the [runtime builder] may be used. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// ``` + /// + /// [mod]: index.html + /// [main]: ../attr.main.html + /// [threaded scheduler]: index.html#threaded-scheduler + /// [basic scheduler]: index.html#basic-scheduler + /// [runtime builder]: crate::runtime::Builder + #[cfg(feature = "rt-multi-thread")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] + pub fn new() -> std::io::Result<Runtime> { + Builder::new_multi_thread().enable_all().build() + } + + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(async { + /// println!("now running on a worker thread"); + /// }); + /// # } + /// ``` + pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match &self.kind { + #[cfg(feature = "rt-multi-thread")] + Kind::ThreadPool(exec) => exec.spawn(future), + Kind::CurrentThread(exec) => exec.spawn(future), + } + } + + /// Run the provided function on an executor dedicated to blocking operations. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Spawn a blocking function onto the runtime + /// rt.spawn_blocking(|| { + /// println!("now running on a worker thread"); + /// }); + /// # } + pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> + where + F: FnOnce() -> R + Send + 'static, + { + self.handle.spawn_blocking(func) + } + + /// Run a future to completion on the Tokio runtime. This is the + /// runtime's entry point. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers + /// which the future spawns internally will be executed on the runtime. + /// + /// # Multi thread scheduler + /// + /// When the multi thread scheduler is used this will allow futures + /// to run within the io driver and timer context of the overall runtime. + /// + /// # Current thread scheduler + /// + /// When the current thread scheduler is enabled `block_on` + /// can be called concurrently from multiple threads. The first call + /// will take ownership of the io and timer drivers. This means + /// other threads which do not own the drivers will hook into that one. + /// When the first `block_on` completes, other threads will be able to + /// "steal" the driver to allow continued execution of their futures. + /// + /// # Panics + /// + /// This function panics if the provided future panics, or if not called within an + /// asynchronous execution context. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::runtime::Runtime; + /// + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Execute the future, blocking the current thread until completion + /// rt.block_on(async { + /// println!("hello"); + /// }); + /// ``` + /// + /// [handle]: fn@Handle::block_on + pub fn block_on<F: Future>(&self, future: F) -> F::Output { + let _enter = self.enter(); + + match &self.kind { + Kind::CurrentThread(exec) => exec.block_on(future), + #[cfg(feature = "rt-multi-thread")] + Kind::ThreadPool(exec) => exec.block_on(future), + } + } + + /// Enter the runtime context. + /// + /// This allows you to construct types that must have an executor + /// available on creation such as [`Sleep`] or [`TcpStream`]. It will + /// also allow you to call methods such as [`tokio::spawn`]. + /// + /// [`Sleep`]: struct@crate::time::Sleep + /// [`TcpStream`]: struct@crate::net::TcpStream + /// [`tokio::spawn`]: fn@crate::spawn + /// + /// # Example + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// fn function_that_spawns(msg: String) { + /// // Had we not used `rt.enter` below, this would panic. + /// tokio::spawn(async move { + /// println!("{}", msg); + /// }); + /// } + /// + /// fn main() { + /// let rt = Runtime::new().unwrap(); + /// + /// let s = "Hello World!".to_string(); + /// + /// // By entering the context, we tie `tokio::spawn` to this executor. + /// let _guard = rt.enter(); + /// function_that_spawns(s); + /// } + /// ``` + pub fn enter(&self) -> EnterGuard<'_> { + EnterGuard { + rt: self, + guard: context::enter(self.handle.clone()), + } + } + + /// Shutdown the runtime, waiting for at most `duration` for all spawned + /// task to shutdown. + /// + /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to + /// shutdown in a timely fashion. However, dropping a `Runtime` will wait + /// indefinitely for all tasks to terminate, and there are cases where a long + /// blocking task has been spawned, which can block dropping `Runtime`. + /// + /// In this case, calling `shutdown_timeout` with an explicit wait timeout + /// can work. The `shutdown_timeout` will signal all tasks to shutdown and + /// will wait for at most `duration` for all spawned tasks to terminate. If + /// `timeout` elapses before all tasks are dropped, the function returns and + /// outstanding tasks are potentially leaked. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// use std::thread; + /// use std::time::Duration; + /// + /// fn main() { + /// let runtime = Runtime::new().unwrap(); + /// + /// runtime.block_on(async move { + /// task::spawn_blocking(move || { + /// thread::sleep(Duration::from_secs(10_000)); + /// }); + /// }); + /// + /// runtime.shutdown_timeout(Duration::from_millis(100)); + /// } + /// ``` + pub fn shutdown_timeout(mut self, duration: Duration) { + // Wakeup and shutdown all the worker threads + self.handle.spawner.shutdown(); + self.blocking_pool.shutdown(Some(duration)); + } + + /// Shutdown the runtime, without waiting for any spawned tasks to shutdown. + /// + /// This can be useful if you want to drop a runtime from within another runtime. + /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks + /// to complete, which would normally not be permitted within an asynchronous context. + /// By calling `shutdown_background()`, you can drop the runtime from such a context. + /// + /// Note however, that because we do not wait for any blocking tasks to complete, this + /// may result in a resource leak (in that any blocking tasks are still running until they + /// return. + /// + /// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// fn main() { + /// let runtime = Runtime::new().unwrap(); + /// + /// runtime.block_on(async move { + /// let inner_runtime = Runtime::new().unwrap(); + /// // ... + /// inner_runtime.shutdown_background(); + /// }); + /// } + /// ``` + pub fn shutdown_background(self) { + self.shutdown_timeout(Duration::from_nanos(0)) + } } } |