diff options
Diffstat (limited to 'src/runtime')
28 files changed, 1491 insertions, 1426 deletions
diff --git a/src/runtime/basic_scheduler.rs b/src/runtime/basic_scheduler.rs index 7e1c257..5ca8467 100644 --- a/src/runtime/basic_scheduler.rs +++ b/src/runtime/basic_scheduler.rs @@ -1,22 +1,35 @@ +use crate::future::poll_fn; +use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; -use crate::runtime; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; -use crate::util::linked_list::LinkedList; -use crate::util::{waker_ref, Wake}; +use crate::sync::notify::Notify; +use crate::util::linked_list::{Link, LinkedList}; +use crate::util::{waker_ref, Wake, WakerRef}; use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::sync::{Arc, Mutex}; -use std::task::Poll::Ready; +use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; use std::time::Duration; /// Executes tasks on the current thread -pub(crate) struct BasicScheduler<P> -where - P: Park, -{ +pub(crate) struct BasicScheduler<P: Park> { + /// Inner state guarded by a mutex that is shared + /// between all `block_on` calls. + inner: Mutex<Option<Inner<P>>>, + + /// Notifier for waking up other threads to steal the + /// parker. + notify: Notify, + + /// Sendable task spawner + spawner: Spawner, +} + +/// The inner scheduler that owns the task queue and the main parker P. +struct Inner<P: Park> { /// Scheduler run queue /// /// When the scheduler is executed, the queue is removed from `self` and @@ -42,7 +55,7 @@ pub(crate) struct Spawner { struct Tasks { /// Collection of all active tasks spawned onto this executor. - owned: LinkedList<Task<Arc<Shared>>>, + owned: LinkedList<Task<Arc<Shared>>, <Task<Arc<Shared>> as Link>::Target>, /// Local run queue. /// @@ -59,7 +72,7 @@ struct Shared { unpark: Box<dyn Unpark>, } -/// Thread-local context +/// Thread-local context. struct Context { /// Shared scheduler state shared: Arc<Shared>, @@ -68,38 +81,43 @@ struct Context { tasks: RefCell<Tasks>, } -/// Initial queue capacity +/// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; /// Max number of tasks to poll per tick. const MAX_TASKS_PER_TICK: usize = 61; -/// How often ot check the remote queue first +/// How often to check the remote queue first. const REMOTE_FIRST_INTERVAL: u8 = 31; -// Tracks the current BasicScheduler +// Tracks the current BasicScheduler. scoped_thread_local!(static CURRENT: Context); -impl<P> BasicScheduler<P> -where - P: Park, -{ +impl<P: Park> BasicScheduler<P> { pub(crate) fn new(park: P) -> BasicScheduler<P> { let unpark = Box::new(park.unpark()); - BasicScheduler { + let spawner = Spawner { + shared: Arc::new(Shared { + queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + unpark: unpark as Box<dyn Unpark>, + }), + }; + + let inner = Mutex::new(Some(Inner { tasks: Some(Tasks { owned: LinkedList::new(), queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), - spawner: Spawner { - shared: Arc::new(Shared { - queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), - unpark: unpark as Box<dyn Unpark>, - }), - }, + spawner: spawner.clone(), tick: 0, park, + })); + + BasicScheduler { + inner, + notify: Notify::new(), + spawner, } } @@ -116,13 +134,57 @@ where self.spawner.spawn(future) } - pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output - where - F: Future, - { + pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output { + pin!(future); + + // Attempt to steal the dedicated parker and block_on the future if we can there, + // othwerwise, lets select on a notification that the parker is available + // or the future is complete. + loop { + if let Some(inner) = &mut self.take_inner() { + return inner.block_on(future); + } else { + let mut enter = crate::runtime::enter(false); + + let notified = self.notify.notified(); + pin!(notified); + + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } + + if let Ready(out) = future.as_mut().poll(cx) { + return Ready(Some(out)); + } + + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } + } + } + } + + fn take_inner(&self) -> Option<InnerGuard<'_, P>> { + let inner = self.inner.lock().take()?; + + Some(InnerGuard { + inner: Some(inner), + basic_scheduler: &self, + }) + } +} + +impl<P: Park> Inner<P> { + /// Block on the future provided and drive the runtime's driver. + fn block_on<F: Future>(&mut self, future: F) -> F::Output { enter(self, |scheduler, context| { - let _enter = runtime::enter(false); - let waker = waker_ref(&scheduler.spawner.shared); + let _enter = crate::runtime::enter(false); + let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); pin!(future); @@ -177,16 +239,16 @@ where /// Enter the scheduler context. This sets the queue and other necessary /// scheduler state in the thread-local -fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R +fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R where - F: FnOnce(&mut BasicScheduler<P>, &Context) -> R, + F: FnOnce(&mut Inner<P>, &Context) -> R, P: Park, { // Ensures the run queue is placed back in the `BasicScheduler` instance // once `block_on` returns.` struct Guard<'a, P: Park> { context: Option<Context>, - scheduler: &'a mut BasicScheduler<P>, + scheduler: &'a mut Inner<P>, } impl<P: Park> Drop for Guard<'_, P> { @@ -213,12 +275,18 @@ where CURRENT.set(context, || f(scheduler, context)) } -impl<P> Drop for BasicScheduler<P> -where - P: Park, -{ +impl<P: Park> Drop for BasicScheduler<P> { fn drop(&mut self) { - enter(self, |scheduler, context| { + // Avoid a double panic if we are currently panicking and + // the lock may be poisoned. + + let mut inner = match self.inner.lock().take() { + Some(inner) => inner, + None if std::thread::panicking() => return, + None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), + }; + + enter(&mut inner, |scheduler, context| { // Loop required here to ensure borrow is dropped between iterations #[allow(clippy::while_let_loop)] loop { @@ -236,7 +304,7 @@ where } // Drain remote queue - for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) { + for task in scheduler.spawner.shared.queue.lock().drain(..) { task.shutdown(); } @@ -266,7 +334,11 @@ impl Spawner { } fn pop(&self) -> Option<task::Notified<Arc<Shared>>> { - self.shared.queue.lock().unwrap().pop_front() + self.shared.queue.lock().pop_front() + } + + fn waker_ref(&self) -> WakerRef<'_> { + waker_ref(&self.shared) } } @@ -307,7 +379,7 @@ impl Schedule for Arc<Shared> { cx.tasks.borrow_mut().queue.push_back(task); } _ => { - self.queue.lock().unwrap().push_back(task); + self.queue.lock().push_back(task); self.unpark.unpark(); } }); @@ -324,3 +396,37 @@ impl Wake for Shared { arc_self.unpark.unpark(); } } + +// ===== InnerGuard ===== + +/// Used to ensure we always place the Inner value +/// back into its slot in `BasicScheduler`, even if the +/// future panics. +struct InnerGuard<'a, P: Park> { + inner: Option<Inner<P>>, + basic_scheduler: &'a BasicScheduler<P>, +} + +impl<P: Park> InnerGuard<'_, P> { + fn block_on<F: Future>(&mut self, future: F) -> F::Output { + // The only time inner gets set to `None` is if we have dropped + // already so this unwrap is safe. + self.inner.as_mut().unwrap().block_on(future) + } +} + +impl<P: Park> Drop for InnerGuard<'_, P> { + fn drop(&mut self) { + if let Some(scheduler) = self.inner.take() { + let mut lock = self.basic_scheduler.inner.lock(); + + // Replace old scheduler back into the state to allow + // other threads to pick it up and drive it. + lock.replace(scheduler); + + // Wake up other possible threads that could steal + // the dedicated parker P. + self.basic_scheduler.notify.notify_one() + } + } +} diff --git a/src/runtime/blocking/mod.rs b/src/runtime/blocking/mod.rs index 0b36a75..fece3c2 100644 --- a/src/runtime/blocking/mod.rs +++ b/src/runtime/blocking/mod.rs @@ -3,22 +3,20 @@ //! shells. This isolates the complexity of dealing with conditional //! compilation. -cfg_blocking_impl! { - mod pool; - pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner}; +mod pool; +pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; - mod schedule; - mod shutdown; - pub(crate) mod task; +mod schedule; +mod shutdown; +pub(crate) mod task; - use crate::runtime::Builder; - - pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { - BlockingPool::new(builder, thread_cap) +use crate::runtime::Builder; - } +pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { + BlockingPool::new(builder, thread_cap) } +/* cfg_not_blocking_impl! { use crate::runtime::Builder; use std::time::Duration; @@ -41,3 +39,4 @@ cfg_not_blocking_impl! { } } } +*/ diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs index 40d417b..2967a10 100644 --- a/src/runtime/blocking/pool.rs +++ b/src/runtime/blocking/pool.rs @@ -5,9 +5,13 @@ use crate::loom::thread; use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::shutdown; use crate::runtime::blocking::task::BlockingTask; +use crate::runtime::builder::ThreadNameFn; +use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; +use slab::Slab; + use std::collections::VecDeque; use std::fmt; use std::time::Duration; @@ -30,7 +34,7 @@ struct Inner { condvar: Condvar, /// Spawned threads use this name - thread_name: String, + thread_name: ThreadNameFn, /// Spawned thread stack size stack_size: Option<usize>, @@ -41,7 +45,11 @@ struct Inner { /// Call before a thread stops before_stop: Option<Callback>, + // Maximum number of threads thread_cap: usize, + + // Customizable wait timeout + keep_alive: Duration, } struct Shared { @@ -51,6 +59,7 @@ struct Shared { num_notify: u32, shutdown: bool, shutdown_tx: Option<shutdown::Sender>, + worker_threads: Slab<thread::JoinHandle<()>>, } type Task = task::Notified<NoopSchedule>; @@ -62,11 +71,8 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, { - let rt = Handle::current(); - - let (task, handle) = task::joinable(BlockingTask::new(func)); - let _ = rt.blocking_spawner.spawn(task, &rt); - handle + let rt = context::current().expect("not currently running on the Tokio runtime."); + rt.spawn_blocking(func) } #[allow(dead_code)] @@ -74,7 +80,7 @@ pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()> where F: FnOnce() -> R + Send + 'static, { - let rt = Handle::current(); + let rt = context::current().expect("not currently running on the Tokio runtime."); let (task, _handle) = task::joinable(BlockingTask::new(func)); rt.blocking_spawner.spawn(task, &rt) @@ -85,6 +91,7 @@ where impl BlockingPool { pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool { let (shutdown_tx, shutdown_rx) = shutdown::channel(); + let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE); BlockingPool { spawner: Spawner { @@ -96,6 +103,7 @@ impl BlockingPool { num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), + worker_threads: Slab::new(), }), condvar: Condvar::new(), thread_name: builder.thread_name.clone(), @@ -103,6 +111,7 @@ impl BlockingPool { after_start: builder.after_start.clone(), before_stop: builder.before_stop.clone(), thread_cap, + keep_alive, }), }, shutdown_rx, @@ -114,7 +123,7 @@ impl BlockingPool { } pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) { - let mut shared = self.spawner.inner.shared.lock().unwrap(); + let mut shared = self.spawner.inner.shared.lock(); // The function can be called multiple times. First, by explicitly // calling `shutdown` then by the drop handler calling `shutdown`. This @@ -126,10 +135,15 @@ impl BlockingPool { shared.shutdown = true; shared.shutdown_tx = None; self.spawner.inner.condvar.notify_all(); + let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new()); drop(shared); - self.shutdown_rx.wait(timeout); + if self.shutdown_rx.wait(timeout) { + for handle in workers.drain() { + let _ = handle.join(); + } + } } } @@ -150,7 +164,7 @@ impl fmt::Debug for BlockingPool { impl Spawner { pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { let shutdown_tx = { - let mut shared = self.inner.shared.lock().unwrap(); + let mut shared = self.inner.shared.lock(); if shared.shutdown { // Shutdown the task @@ -187,14 +201,24 @@ impl Spawner { }; if let Some(shutdown_tx) = shutdown_tx { - self.spawn_thread(shutdown_tx, rt); + let mut shared = self.inner.shared.lock(); + let entry = shared.worker_threads.vacant_entry(); + + let handle = self.spawn_thread(shutdown_tx, rt, entry.key()); + + entry.insert(handle); } Ok(()) } - fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) { - let mut builder = thread::Builder::new().name(self.inner.thread_name.clone()); + fn spawn_thread( + &self, + shutdown_tx: shutdown::Sender, + rt: &Handle, + worker_id: usize, + ) -> thread::JoinHandle<()> { + let mut builder = thread::Builder::new().name((self.inner.thread_name)()); if let Some(stack_size) = self.inner.stack_size { builder = builder.stack_size(stack_size); @@ -205,23 +229,21 @@ impl Spawner { builder .spawn(move || { // Only the reference should be moved into the closure - let rt = &rt; - rt.enter(move || { - rt.blocking_spawner.inner.run(); - drop(shutdown_tx); - }) + let _enter = crate::runtime::context::enter(rt.clone()); + rt.blocking_spawner.inner.run(worker_id); + drop(shutdown_tx); }) - .unwrap(); + .unwrap() } } impl Inner { - fn run(&self) { + fn run(&self, worker_id: usize) { if let Some(f) = &self.after_start { f() } - let mut shared = self.shared.lock().unwrap(); + let mut shared = self.shared.lock(); 'main: loop { // BUSY @@ -229,14 +251,14 @@ impl Inner { drop(shared); task.run(); - shared = self.shared.lock().unwrap(); + shared = self.shared.lock(); } // IDLE shared.num_idle += 1; while !shared.shutdown { - let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap(); + let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); shared = lock_result.0; let timeout_result = lock_result.1; @@ -252,6 +274,8 @@ impl Inner { // Even if the condvar "timed out", if the pool is entering the // shutdown phase, we want to perform the cleanup logic. if !shared.shutdown && timeout_result.timed_out() { + shared.worker_threads.remove(worker_id); + break 'main; } @@ -264,7 +288,7 @@ impl Inner { drop(shared); task.shutdown(); - shared = self.shared.lock().unwrap(); + shared = self.shared.lock(); } // Work was produced, and we "took" it (by decrementing num_notify). diff --git a/src/runtime/blocking/shutdown.rs b/src/runtime/blocking/shutdown.rs index e76a701..3b6cc59 100644 --- a/src/runtime/blocking/shutdown.rs +++ b/src/runtime/blocking/shutdown.rs @@ -32,11 +32,13 @@ impl Receiver { /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout` /// duration. If `timeout` is `None`, then the thread is blocked until the /// shutdown signal is received. - pub(crate) fn wait(&mut self, timeout: Option<Duration>) { + /// + /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`. + pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool { use crate::runtime::enter::try_enter; if timeout == Some(Duration::from_nanos(0)) { - return; + return true; } let mut e = match try_enter(false) { @@ -44,7 +46,7 @@ impl Receiver { _ => { if std::thread::panicking() { // Don't panic in a panic - return; + return false; } else { panic!( "Cannot drop a runtime in a context where blocking is not allowed. \ @@ -60,9 +62,10 @@ impl Receiver { // current thread (usually, shutting down a runtime stored in a // thread-local). if let Some(timeout) = timeout { - let _ = e.block_on_timeout(&mut self.rx, timeout); + e.block_on_timeout(&mut self.rx, timeout).is_ok() } else { let _ = e.block_on(&mut self.rx); + true } } } diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs index fad72c7..e792c7d 100644 --- a/src/runtime/builder.rs +++ b/src/runtime/builder.rs @@ -1,23 +1,24 @@ use crate::runtime::handle::Handle; -use crate::runtime::shell::Shell; -use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; +use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; use std::fmt; -#[cfg(not(loom))] -use std::sync::Arc; +use std::io; +use std::time::Duration; /// Builds Tokio Runtime with custom configuration values. /// /// Methods can be chained in order to set the configuration values. The /// Runtime is constructed by calling [`build`]. /// -/// New instances of `Builder` are obtained via [`Builder::new`]. +/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`] +/// or [`Builder::new_current_thread`]. /// /// See function level documentation for details on the various configuration /// settings. /// /// [`build`]: method@Self::build -/// [`Builder::new`]: method@Self::new +/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread +/// [`Builder::new_current_thread`]: method@Self::new_current_thread /// /// # Examples /// @@ -26,9 +27,8 @@ use std::sync::Arc; /// /// fn main() { /// // build runtime -/// let runtime = Builder::new() -/// .threaded_scheduler() -/// .core_threads(4) +/// let runtime = Builder::new_multi_thread() +/// .worker_threads(4) /// .thread_name("my-custom-name") /// .thread_stack_size(3 * 1024 * 1024) /// .build() @@ -38,7 +38,7 @@ use std::sync::Arc; /// } /// ``` pub struct Builder { - /// The task execution model to use. + /// Runtime type kind: Kind, /// Whether or not to enable the I/O driver @@ -50,13 +50,13 @@ pub struct Builder { /// The number of worker threads, used by Runtime. /// /// Only used when not using the current-thread executor. - core_threads: Option<usize>, + worker_threads: Option<usize>, /// Cap on thread usage. max_threads: usize, - /// Name used for threads spawned by the runtime. - pub(super) thread_name: String, + /// Name fn used for threads spawned by the runtime. + pub(super) thread_name: ThreadNameFn, /// Stack size used for threads spawned by the runtime. pub(super) thread_stack_size: Option<usize>, @@ -66,26 +66,43 @@ pub struct Builder { /// To run before each worker thread stops pub(super) before_stop: Option<Callback>, + + /// Customizable keep alive timeout for BlockingPool + pub(super) keep_alive: Option<Duration>, } -#[derive(Debug, Clone, Copy)] -enum Kind { - Shell, - #[cfg(feature = "rt-core")] - Basic, - #[cfg(feature = "rt-threaded")] - ThreadPool, +pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>; + +pub(crate) enum Kind { + CurrentThread, + #[cfg(feature = "rt-multi-thread")] + MultiThread, } impl Builder { + /// Returns a new builder with the current thread scheduler selected. + /// + /// Configuration methods can be chained on the return value. + pub fn new_current_thread() -> Builder { + Builder::new(Kind::CurrentThread) + } + + /// Returns a new builder with the multi thread scheduler selected. + /// + /// Configuration methods can be chained on the return value. + #[cfg(feature = "rt-multi-thread")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] + pub fn new_multi_thread() -> Builder { + Builder::new(Kind::MultiThread) + } + /// Returns a new runtime builder initialized with default configuration /// values. /// /// Configuration methods can be chained on the return value. - pub fn new() -> Builder { + pub(crate) fn new(kind: Kind) -> Builder { Builder { - // No task execution by default - kind: Kind::Shell, + kind, // I/O defaults to "off" enable_io: false, @@ -94,12 +111,12 @@ impl Builder { enable_time: false, // Default to lazy auto-detection (one thread per CPU core) - core_threads: None, + worker_threads: None, max_threads: 512, // Default thread name - thread_name: "tokio-runtime-worker".into(), + thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()), // Do not set a stack size by default thread_stack_size: None, @@ -107,6 +124,8 @@ impl Builder { // No worker thread callbacks after_start: None, before_stop: None, + + keep_alive: None, } } @@ -121,14 +140,13 @@ impl Builder { /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() - /// .threaded_scheduler() + /// let rt = runtime::Builder::new_multi_thread() /// .enable_all() /// .build() /// .unwrap(); /// ``` pub fn enable_all(&mut self) -> &mut Self { - #[cfg(feature = "io-driver")] + #[cfg(any(feature = "net", feature = "process", all(unix, feature = "signal")))] self.enable_io(); #[cfg(feature = "time")] self.enable_time(); @@ -136,51 +154,68 @@ impl Builder { self } - #[deprecated(note = "In future will be replaced by core_threads method")] - /// Sets the maximum number of worker threads for the `Runtime`'s thread pool. + /// Sets the number of worker threads the `Runtime` will use. + /// + /// This should be a number between 0 and 32,768 though it is advised to + /// keep this value on the smaller side. /// - /// This must be a number between 1 and 32,768 though it is advised to keep - /// this value on the smaller side. + /// # Default /// /// The default value is the number of cores available to the system. - pub fn num_threads(&mut self, val: usize) -> &mut Self { - self.core_threads = Some(val); - self - } - - /// Sets the core number of worker threads for the `Runtime`'s thread pool. /// - /// This should be a number between 1 and 32,768 though it is advised to keep - /// this value on the smaller side. + /// # Panic /// - /// The default value is the number of cores available to the system. + /// When using the `current_thread` runtime this method will panic, since + /// those variants do not allow setting worker thread counts. /// - /// These threads will be always active and running. /// /// # Examples /// + /// ## Multi threaded runtime with 4 threads + /// + /// ``` + /// use tokio::runtime; + /// + /// // This will spawn a work-stealing runtime with 4 worker threads. + /// let rt = runtime::Builder::new_multi_thread() + /// .worker_threads(4) + /// .build() + /// .unwrap(); + /// + /// rt.spawn(async move {}); + /// ``` + /// + /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`) + /// /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() - /// .threaded_scheduler() - /// .core_threads(4) + /// // Create a runtime that _must_ be driven from a call + /// // to `Runtime::block_on`. + /// let rt = runtime::Builder::new_current_thread() /// .build() /// .unwrap(); + /// + /// // This will run the runtime and future on the current thread + /// rt.block_on(async move {}); /// ``` - pub fn core_threads(&mut self, val: usize) -> &mut Self { - assert_ne!(val, 0, "Core threads cannot be zero"); - self.core_threads = Some(val); + /// + /// # Panic + /// + /// This will panic if `val` is not larger than `0`. + pub fn worker_threads(&mut self, val: usize) -> &mut Self { + assert!(val > 0, "Worker threads cannot be set to 0"); + self.worker_threads = Some(val); self } /// Specifies limit for threads, spawned by the Runtime. /// /// This is number of threads to be used by Runtime, including `core_threads` - /// Having `max_threads` less than `core_threads` results in invalid configuration + /// Having `max_threads` less than `worker_threads` results in invalid configuration /// when building multi-threaded `Runtime`, which would cause a panic. /// - /// Similarly to the `core_threads`, this number should be between 1 and 32,768. + /// Similarly to the `worker_threads`, this number should be between 0 and 32,768. /// /// The default value is 512. /// @@ -189,7 +224,6 @@ impl Builder { /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for /// blocking annotations) as `max_threads - core_threads`. pub fn max_threads(&mut self, val: usize) -> &mut Self { - assert_ne!(val, 0, "Thread limit cannot be zero"); self.max_threads = val; self } @@ -204,13 +238,42 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .thread_name("my-pool") /// .build(); /// # } /// ``` pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self { - self.thread_name = val.into(); + let val = val.into(); + self.thread_name = std::sync::Arc::new(move || val.clone()); + self + } + + /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool. + /// + /// The default name fn is `|| "tokio-runtime-worker".into()`. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// # use std::sync::atomic::{AtomicUsize, Ordering}; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_multi_thread() + /// .thread_name_fn(|| { + /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + /// format!("my-pool-{}", id) + /// }) + /// .build(); + /// # } + /// ``` + pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self + where + F: Fn() -> String + Send + Sync + 'static, + { + self.thread_name = std::sync::Arc::new(f); self } @@ -228,8 +291,7 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let rt = runtime::Builder::new() - /// .threaded_scheduler() + /// let rt = runtime::Builder::new_multi_thread() /// .thread_stack_size(32 * 1024) /// .build(); /// # } @@ -250,8 +312,7 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let runtime = runtime::Builder::new() - /// .threaded_scheduler() + /// let runtime = runtime::Builder::new_multi_thread() /// .on_thread_start(|| { /// println!("thread started"); /// }) @@ -263,7 +324,7 @@ impl Builder { where F: Fn() + Send + Sync + 'static, { - self.after_start = Some(Arc::new(f)); + self.after_start = Some(std::sync::Arc::new(f)); self } @@ -277,8 +338,7 @@ impl Builder { /// # use tokio::runtime; /// /// # pub fn main() { - /// let runtime = runtime::Builder::new() - /// .threaded_scheduler() + /// let runtime = runtime::Builder::new_multi_thread() /// .on_thread_stop(|| { /// println!("thread stopping"); /// }) @@ -290,56 +350,86 @@ impl Builder { where F: Fn() + Send + Sync + 'static, { - self.before_stop = Some(Arc::new(f)); + self.before_stop = Some(std::sync::Arc::new(f)); self } /// Creates the configured `Runtime`. /// - /// The returned `ThreadPool` instance is ready to spawn tasks. + /// The returned `Runtime` instance is ready to spawn tasks. /// /// # Examples /// /// ``` /// use tokio::runtime::Builder; /// - /// let mut rt = Builder::new().build().unwrap(); + /// let rt = Builder::new_multi_thread().build().unwrap(); /// /// rt.block_on(async { /// println!("Hello from the Tokio runtime"); /// }); /// ``` pub fn build(&mut self) -> io::Result<Runtime> { - match self.kind { - Kind::Shell => self.build_shell_runtime(), - #[cfg(feature = "rt-core")] - Kind::Basic => self.build_basic_runtime(), - #[cfg(feature = "rt-threaded")] - Kind::ThreadPool => self.build_threaded_runtime(), + match &self.kind { + Kind::CurrentThread => self.build_basic_runtime(), + #[cfg(feature = "rt-multi-thread")] + Kind::MultiThread => self.build_threaded_runtime(), } } - fn build_shell_runtime(&mut self) -> io::Result<Runtime> { - use crate::runtime::Kind; + fn get_cfg(&self) -> driver::Cfg { + driver::Cfg { + enable_io: self.enable_io, + enable_time: self.enable_time, + } + } - let clock = time::create_clock(); + /// Sets a custom timeout for a thread in the blocking pool. + /// + /// By default, the timeout for a thread is set to 10 seconds. This can + /// be overriden using .thread_keep_alive(). + /// + /// # Example + /// + /// ``` + /// # use tokio::runtime; + /// # use std::time::Duration; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_multi_thread() + /// .thread_keep_alive(Duration::from_millis(100)) + /// .build(); + /// # } + /// ``` + pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self { + self.keep_alive = Some(duration); + self + } + + fn build_basic_runtime(&mut self) -> io::Result<Runtime> { + use crate::runtime::{BasicScheduler, Kind}; - // Create I/O driver - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + let (driver, resources) = driver::Driver::new(self.get_cfg())?; - let spawner = Spawner::Shell; + // And now put a single-threaded scheduler on top of the timer. When + // there are no futures ready to do something, it'll let the timer or + // the reactor to generate some new stimuli for the futures to continue + // in their life. + let scheduler = BasicScheduler::new(driver); + let spawner = Spawner::Basic(scheduler.spawner().clone()); + // Blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { - kind: Kind::Shell(Shell::new(driver)), + kind: Kind::CurrentThread(scheduler), handle: Handle { spawner, - io_handle, - time_handle, - clock, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, blocking_spawner, }, blocking_pool, @@ -359,7 +449,7 @@ cfg_io_driver! { /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .enable_io() /// .build() /// .unwrap(); @@ -382,7 +472,7 @@ cfg_time! { /// ``` /// use tokio::runtime; /// - /// let rt = runtime::Builder::new() + /// let rt = runtime::Builder::new_multi_thread() /// .enable_time() /// .build() /// .unwrap(); @@ -394,85 +484,19 @@ cfg_time! { } } -cfg_rt_core! { +cfg_rt_multi_thread! { impl Builder { - /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread. - /// - /// The executor and all necessary drivers will all be run on the current - /// thread during [`block_on`] calls. - /// - /// See also [the module level documentation][1], which has a section on scheduler - /// types. - /// - /// [1]: index.html#runtime-configurations - /// [`block_on`]: Runtime::block_on - pub fn basic_scheduler(&mut self) -> &mut Self { - self.kind = Kind::Basic; - self - } - - fn build_basic_runtime(&mut self) -> io::Result<Runtime> { - use crate::runtime::{BasicScheduler, Kind}; - - let clock = time::create_clock(); - - // Create I/O driver - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); - - // And now put a single-threaded scheduler on top of the timer. When - // there are no futures ready to do something, it'll let the timer or - // the reactor to generate some new stimuli for the futures to continue - // in their life. - let scheduler = BasicScheduler::new(driver); - let spawner = Spawner::Basic(scheduler.spawner().clone()); - - // Blocking pool - let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); - let blocking_spawner = blocking_pool.spawner().clone(); - - Ok(Runtime { - kind: Kind::Basic(scheduler), - handle: Handle { - spawner, - io_handle, - time_handle, - clock, - blocking_spawner, - }, - blocking_pool, - }) - } - } -} - -cfg_rt_threaded! { - impl Builder { - /// Sets runtime to use a multi-threaded scheduler for executing tasks. - /// - /// See also [the module level documentation][1], which has a section on scheduler - /// types. - /// - /// [1]: index.html#runtime-configurations - pub fn threaded_scheduler(&mut self) -> &mut Self { - self.kind = Kind::ThreadPool; - self - } - fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { use crate::loom::sys::num_cpus; use crate::runtime::{Kind, ThreadPool}; use crate::runtime::park::Parker; use std::cmp; - let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus())); + let core_threads = self.worker_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus())); assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit"); - let clock = time::create_clock(); + let (driver, resources) = driver::Driver::new(self.get_cfg())?; - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); @@ -483,14 +507,16 @@ cfg_rt_threaded! { // Create the runtime handle let handle = Handle { spawner, - io_handle, - time_handle, - clock, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, blocking_spawner, }; // Spawn the thread pool workers - handle.enter(|| launch.launch()); + let _enter = crate::runtime::context::enter(handle.clone()); + launch.launch(); Ok(Runtime { kind: Kind::ThreadPool(scheduler), @@ -501,19 +527,15 @@ cfg_rt_threaded! { } } -impl Default for Builder { - fn default() -> Self { - Self::new() - } -} - impl fmt::Debug for Builder { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Builder") - .field("kind", &self.kind) - .field("core_threads", &self.core_threads) + .field("worker_threads", &self.worker_threads) .field("max_threads", &self.max_threads) - .field("thread_name", &self.thread_name) + .field( + "thread_name", + &"<dyn Fn() -> String + Send + Sync + 'static>", + ) .field("thread_stack_size", &self.thread_stack_size) .field("after_start", &self.after_start.as_ref().map(|_| "...")) .field("before_stop", &self.after_start.as_ref().map(|_| "...")) diff --git a/src/runtime/context.rs b/src/runtime/context.rs index 1b267f4..0817019 100644 --- a/src/runtime/context.rs +++ b/src/runtime/context.rs @@ -12,7 +12,7 @@ pub(crate) fn current() -> Option<Handle> { } cfg_io_driver! { - pub(crate) fn io_handle() -> crate::runtime::io::Handle { + pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => ctx.io_handle.clone(), None => Default::default(), @@ -20,8 +20,18 @@ cfg_io_driver! { } } +cfg_signal_internal! { + #[cfg(unix)] + pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => ctx.signal_handle.clone(), + None => Default::default(), + }) + } +} + cfg_time! { - pub(crate) fn time_handle() -> crate::runtime::time::Handle { + pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => ctx.time_handle.clone(), None => Default::default(), @@ -29,7 +39,7 @@ cfg_time! { } cfg_test_util! { - pub(crate) fn clock() -> Option<crate::runtime::time::Clock> { + pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => Some(ctx.clock.clone()), None => None, @@ -38,7 +48,7 @@ cfg_time! { } } -cfg_rt_core! { +cfg_rt! { pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => Some(ctx.spawner.clone()), @@ -50,24 +60,20 @@ cfg_rt_core! { /// Set this [`Handle`] as the current active [`Handle`]. /// /// [`Handle`]: Handle -pub(crate) fn enter<F, R>(new: Handle, f: F) -> R -where - F: FnOnce() -> R, -{ - struct DropGuard(Option<Handle>); - - impl Drop for DropGuard { - fn drop(&mut self) { - CONTEXT.with(|ctx| { - *ctx.borrow_mut() = self.0.take(); - }); - } - } - - let _guard = CONTEXT.with(|ctx| { +pub(crate) fn enter(new: Handle) -> EnterGuard { + CONTEXT.with(|ctx| { let old = ctx.borrow_mut().replace(new); - DropGuard(old) - }); + EnterGuard(old) + }) +} + +#[derive(Debug)] +pub(crate) struct EnterGuard(Option<Handle>); - f() +impl Drop for EnterGuard { + fn drop(&mut self) { + CONTEXT.with(|ctx| { + *ctx.borrow_mut() = self.0.take(); + }); + } } diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs new file mode 100644 index 0000000..e89de9d --- /dev/null +++ b/src/runtime/driver.rs @@ -0,0 +1,205 @@ +//! Abstracts out the entire chain of runtime sub-drivers into common types. +use crate::park::thread::ParkThread; +use crate::park::Park; + +use std::io; +use std::time::Duration; + +// ===== io driver ===== + +cfg_io_driver! { + type IoDriver = crate::io::driver::Driver; + type IoStack = crate::park::either::Either<ProcessDriver, ParkThread>; + pub(crate) type IoHandle = Option<crate::io::driver::Handle>; + + fn create_io_stack(enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { + use crate::park::either::Either; + + #[cfg(loom)] + assert!(!enabled); + + let ret = if enabled { + let io_driver = crate::io::driver::Driver::new()?; + let io_handle = io_driver.handle(); + + let (signal_driver, signal_handle) = create_signal_driver(io_driver)?; + let process_driver = create_process_driver(signal_driver)?; + + (Either::A(process_driver), Some(io_handle), signal_handle) + } else { + (Either::B(ParkThread::new()), Default::default(), Default::default()) + }; + + Ok(ret) + } +} + +cfg_not_io_driver! { + pub(crate) type IoHandle = (); + type IoStack = ParkThread; + + fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { + Ok((ParkThread::new(), Default::default(), Default::default())) + } +} + +// ===== signal driver ===== + +macro_rules! cfg_signal_internal_and_unix { + ($($item:item)*) => { + #[cfg(unix)] + cfg_signal_internal! { $($item)* } + } +} + +cfg_signal_internal_and_unix! { + type SignalDriver = crate::signal::unix::driver::Driver; + pub(crate) type SignalHandle = Option<crate::signal::unix::driver::Handle>; + + fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { + let driver = crate::signal::unix::driver::Driver::new(io_driver)?; + let handle = driver.handle(); + Ok((driver, Some(handle))) + } +} + +cfg_not_signal_internal! { + pub(crate) type SignalHandle = (); + + cfg_io_driver! { + type SignalDriver = IoDriver; + + fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { + Ok((io_driver, ())) + } + } +} + +// ===== process driver ===== + +cfg_process_driver! { + type ProcessDriver = crate::process::unix::driver::Driver; + + fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> { + crate::process::unix::driver::Driver::new(signal_driver) + } +} + +cfg_not_process_driver! { + cfg_io_driver! { + type ProcessDriver = SignalDriver; + + fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> { + Ok(signal_driver) + } + } +} + +// ===== time driver ===== + +cfg_time! { + type TimeDriver = crate::park::either::Either<crate::time::driver::Driver<IoStack>, IoStack>; + + pub(crate) type Clock = crate::time::Clock; + pub(crate) type TimeHandle = Option<crate::time::driver::Handle>; + + fn create_clock() -> Clock { + crate::time::Clock::new() + } + + fn create_time_driver( + enable: bool, + io_stack: IoStack, + clock: Clock, + ) -> (TimeDriver, TimeHandle) { + use crate::park::either::Either; + + if enable { + let driver = crate::time::driver::Driver::new(io_stack, clock); + let handle = driver.handle(); + + (Either::A(driver), Some(handle)) + } else { + (Either::B(io_stack), None) + } + } +} + +cfg_not_time! { + type TimeDriver = IoStack; + + pub(crate) type Clock = (); + pub(crate) type TimeHandle = (); + + fn create_clock() -> Clock { + () + } + + fn create_time_driver( + _enable: bool, + io_stack: IoStack, + _clock: Clock, + ) -> (TimeDriver, TimeHandle) { + (io_stack, ()) + } +} + +// ===== runtime driver ===== + +#[derive(Debug)] +pub(crate) struct Driver { + inner: TimeDriver, +} + +pub(crate) struct Resources { + pub(crate) io_handle: IoHandle, + pub(crate) signal_handle: SignalHandle, + pub(crate) time_handle: TimeHandle, + pub(crate) clock: Clock, +} + +pub(crate) struct Cfg { + pub(crate) enable_io: bool, + pub(crate) enable_time: bool, +} + +impl Driver { + pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> { + let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?; + + let clock = create_clock(); + let (time_driver, time_handle) = + create_time_driver(cfg.enable_time, io_stack, clock.clone()); + + Ok(( + Self { inner: time_driver }, + Resources { + io_handle, + signal_handle, + time_handle, + clock, + }, + )) + } +} + +impl Park for Driver { + type Unpark = <TimeDriver as Park>::Unpark; + type Error = <TimeDriver as Park>::Error; + + fn unpark(&self) -> Self::Unpark { + self.inner.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park() + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.inner.park_timeout(duration) + } + + fn shutdown(&mut self) { + self.inner.shutdown() + } +} diff --git a/src/runtime/enter.rs b/src/runtime/enter.rs index 56a7c57..4dd8dd0 100644 --- a/src/runtime/enter.rs +++ b/src/runtime/enter.rs @@ -4,8 +4,8 @@ use std::marker::PhantomData; #[derive(Debug, Clone, Copy)] pub(crate) enum EnterContext { + #[cfg_attr(not(feature = "rt"), allow(dead_code))] Entered { - #[allow(dead_code)] allow_blocking: bool, }, NotEntered, @@ -13,11 +13,7 @@ pub(crate) enum EnterContext { impl EnterContext { pub(crate) fn is_entered(self) -> bool { - if let EnterContext::Entered { .. } = self { - true - } else { - false - } + matches!(self, EnterContext::Entered { .. }) } } @@ -28,32 +24,38 @@ pub(crate) struct Enter { _p: PhantomData<RefCell<()>>, } -/// Marks the current thread as being within the dynamic extent of an -/// executor. -pub(crate) fn enter(allow_blocking: bool) -> Enter { - if let Some(enter) = try_enter(allow_blocking) { - return enter; - } +cfg_rt! { + use crate::park::thread::ParkError; - panic!( - "Cannot start a runtime from within a runtime. This happens \ - because a function (like `block_on`) attempted to block the \ - current thread while the thread is being used to drive \ - asynchronous tasks." - ); -} + use std::time::Duration; -/// Tries to enter a runtime context, returns `None` if already in a runtime -/// context. -pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> { - ENTERED.with(|c| { - if c.get().is_entered() { - None - } else { - c.set(EnterContext::Entered { allow_blocking }); - Some(Enter { _p: PhantomData }) + /// Marks the current thread as being within the dynamic extent of an + /// executor. + pub(crate) fn enter(allow_blocking: bool) -> Enter { + if let Some(enter) = try_enter(allow_blocking) { + return enter; } - }) + + panic!( + "Cannot start a runtime from within a runtime. This happens \ + because a function (like `block_on`) attempted to block the \ + current thread while the thread is being used to drive \ + asynchronous tasks." + ); + } + + /// Tries to enter a runtime context, returns `None` if already in a runtime + /// context. + pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> { + ENTERED.with(|c| { + if c.get().is_entered() { + None + } else { + c.set(EnterContext::Entered { allow_blocking }); + Some(Enter { _p: PhantomData }) + } + }) + } } // Forces the current "entered" state to be cleared while the closure @@ -63,115 +65,92 @@ pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> { // // This is hidden for a reason. Do not use without fully understanding // executors. Misuing can easily cause your program to deadlock. -#[cfg(all(feature = "rt-threaded", feature = "blocking"))] -pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R { - // Reset in case the closure panics - struct Reset(EnterContext); - impl Drop for Reset { - fn drop(&mut self) { - ENTERED.with(|c| { - assert!(!c.get().is_entered(), "closure claimed permanent executor"); - c.set(self.0); - }); +cfg_rt_multi_thread! { + pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R { + // Reset in case the closure panics + struct Reset(EnterContext); + impl Drop for Reset { + fn drop(&mut self) { + ENTERED.with(|c| { + assert!(!c.get().is_entered(), "closure claimed permanent executor"); + c.set(self.0); + }); + } } - } - let was = ENTERED.with(|c| { - let e = c.get(); - assert!(e.is_entered(), "asked to exit when not entered"); - c.set(EnterContext::NotEntered); - e - }); + let was = ENTERED.with(|c| { + let e = c.get(); + assert!(e.is_entered(), "asked to exit when not entered"); + c.set(EnterContext::NotEntered); + e + }); - let _reset = Reset(was); - // dropping _reset after f() will reset ENTERED - f() + let _reset = Reset(was); + // dropping _reset after f() will reset ENTERED + f() + } } -cfg_rt_core! { - cfg_rt_util! { - /// Disallow blocking in the current runtime context until the guard is dropped. - pub(crate) fn disallow_blocking() -> DisallowBlockingGuard { - let reset = ENTERED.with(|c| { - if let EnterContext::Entered { - allow_blocking: true, - } = c.get() - { - c.set(EnterContext::Entered { - allow_blocking: false, - }); - true - } else { - false - } - }); - DisallowBlockingGuard(reset) - } +cfg_rt! { + /// Disallow blocking in the current runtime context until the guard is dropped. + pub(crate) fn disallow_blocking() -> DisallowBlockingGuard { + let reset = ENTERED.with(|c| { + if let EnterContext::Entered { + allow_blocking: true, + } = c.get() + { + c.set(EnterContext::Entered { + allow_blocking: false, + }); + true + } else { + false + } + }); + DisallowBlockingGuard(reset) + } - pub(crate) struct DisallowBlockingGuard(bool); - impl Drop for DisallowBlockingGuard { - fn drop(&mut self) { - if self.0 { - // XXX: Do we want some kind of assertion here, or is "best effort" okay? - ENTERED.with(|c| { - if let EnterContext::Entered { - allow_blocking: false, - } = c.get() - { - c.set(EnterContext::Entered { - allow_blocking: true, - }); - } - }) - } + pub(crate) struct DisallowBlockingGuard(bool); + impl Drop for DisallowBlockingGuard { + fn drop(&mut self) { + if self.0 { + // XXX: Do we want some kind of assertion here, or is "best effort" okay? + ENTERED.with(|c| { + if let EnterContext::Entered { + allow_blocking: false, + } = c.get() + { + c.set(EnterContext::Entered { + allow_blocking: true, + }); + } + }) } } } } -cfg_rt_threaded! { - cfg_blocking! { - /// Returns true if in a runtime context. - pub(crate) fn context() -> EnterContext { - ENTERED.with(|c| c.get()) - } +cfg_rt_multi_thread! { + /// Returns true if in a runtime context. + pub(crate) fn context() -> EnterContext { + ENTERED.with(|c| c.get()) } } -cfg_block_on! { +cfg_rt! { impl Enter { /// Blocks the thread on the specified future, returning the value with /// which that future completes. - pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, crate::park::ParkError> + pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, ParkError> where F: std::future::Future, { - use crate::park::{CachedParkThread, Park}; - use std::task::Context; - use std::task::Poll::Ready; + use crate::park::thread::CachedParkThread; let mut park = CachedParkThread::new(); - let waker = park.get_unpark()?.into_waker(); - let mut cx = Context::from_waker(&waker); - - pin!(f); - - loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { - return Ok(v); - } - - park.park()?; - } + park.block_on(f) } - } -} -cfg_blocking_impl! { - use crate::park::ParkError; - use std::time::Duration; - - impl Enter { /// Blocks the thread on the specified future for **at most** `timeout` /// /// If the future completes before `timeout`, the result is returned. If @@ -180,7 +159,8 @@ cfg_blocking_impl! { where F: std::future::Future, { - use crate::park::{CachedParkThread, Park}; + use crate::park::Park; + use crate::park::thread::CachedParkThread; use std::task::Context; use std::task::Poll::Ready; use std::time::Instant; diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs index 0716a7f..b1e8d8f 100644 --- a/src/runtime/handle.rs +++ b/src/runtime/handle.rs @@ -1,16 +1,6 @@ -use crate::runtime::{blocking, context, io, time, Spawner}; -use std::{error, fmt}; - -cfg_blocking! { - use crate::runtime::task; - use crate::runtime::blocking::task::BlockingTask; -} - -cfg_rt_core! { - use crate::task::JoinHandle; - - use std::future::Future; -} +use crate::runtime::blocking::task::BlockingTask; +use crate::runtime::task::{self, JoinHandle}; +use crate::runtime::{blocking, driver, Spawner}; /// Handle to the runtime. /// @@ -19,353 +9,56 @@ cfg_rt_core! { /// /// [`Runtime::handle`]: crate::runtime::Runtime::handle() #[derive(Debug, Clone)] -pub struct Handle { +pub(crate) struct Handle { pub(super) spawner: Spawner, /// Handles to the I/O drivers - pub(super) io_handle: io::Handle, + pub(super) io_handle: driver::IoHandle, + + /// Handles to the signal drivers + pub(super) signal_handle: driver::SignalHandle, /// Handles to the time drivers - pub(super) time_handle: time::Handle, + pub(super) time_handle: driver::TimeHandle, /// Source of `Instant::now()` - pub(super) clock: time::Clock, + pub(super) clock: driver::Clock, /// Blocking pool spawner pub(super) blocking_spawner: blocking::Spawner, } impl Handle { - /// Enter the runtime context. This allows you to construct types that must - /// have an executor available on creation such as [`Delay`] or [`TcpStream`]. - /// It will also allow you to call methods such as [`tokio::spawn`]. - /// - /// This function is also available as [`Runtime::enter`]. - /// - /// [`Delay`]: struct@crate::time::Delay - /// [`TcpStream`]: struct@crate::net::TcpStream - /// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter - /// [`tokio::spawn`]: fn@crate::spawn - /// - /// # Example - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// fn function_that_spawns(msg: String) { - /// // Had we not used `handle.enter` below, this would panic. - /// tokio::spawn(async move { - /// println!("{}", msg); - /// }); - /// } - /// - /// fn main() { - /// let rt = Runtime::new().unwrap(); - /// let handle = rt.handle().clone(); - /// - /// let s = "Hello World!".to_string(); - /// - /// // By entering the context, we tie `tokio::spawn` to this executor. - /// handle.enter(|| function_that_spawns(s)); - /// } - /// ``` - pub fn enter<F, R>(&self, f: F) -> R + // /// Enter the runtime context. This allows you to construct types that must + // /// have an executor available on creation such as [`Sleep`] or [`TcpStream`]. + // /// It will also allow you to call methods such as [`tokio::spawn`]. + // pub(crate) fn enter<F, R>(&self, f: F) -> R + // where + // F: FnOnce() -> R, + // { + // context::enter(self.clone(), f) + // } + + /// Run the provided function on an executor dedicated to blocking operations. + pub(crate) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> where - F: FnOnce() -> R, + F: FnOnce() -> R + Send + 'static, { - context::enter(self.clone(), f) - } - - /// Returns a `Handle` view over the currently running `Runtime` - /// - /// # Panic - /// - /// This will panic if called outside the context of a Tokio runtime. That means that you must - /// call this on one of the threads **being run by the runtime**. Calling this from within a - /// thread created by `std::thread::spawn` (for example) will cause a panic. - /// - /// # Examples - /// - /// This can be used to obtain the handle of the surrounding runtime from an async - /// block or function running on that runtime. - /// - /// ``` - /// # use std::thread; - /// # use tokio::runtime::Runtime; - /// # fn dox() { - /// # let rt = Runtime::new().unwrap(); - /// # rt.spawn(async { - /// use tokio::runtime::Handle; - /// - /// // Inside an async block or function. - /// let handle = Handle::current(); - /// handle.spawn(async { - /// println!("now running in the existing Runtime"); - /// }); - /// - /// # let handle = - /// thread::spawn(move || { - /// // Notice that the handle is created outside of this thread and then moved in - /// handle.block_on(async { /* ... */ }) - /// // This next line would cause a panic - /// // let handle2 = Handle::current(); - /// }); - /// # handle.join().unwrap(); - /// # }); - /// # } - /// ``` - pub fn current() -> Self { - context::current().expect("not currently running on the Tokio runtime.") - } - - /// Returns a Handle view over the currently running Runtime - /// - /// Returns an error if no Runtime has been started - /// - /// Contrary to `current`, this never panics - pub fn try_current() -> Result<Self, TryCurrentError> { - context::current().ok_or(TryCurrentError(())) + #[cfg(feature = "tracing")] + let func = { + let span = tracing::trace_span!( + target: "tokio::task", + "task", + kind = %"blocking", + function = %std::any::type_name::<F>(), + ); + move || { + let _g = span.enter(); + func() + } + }; + let (task, handle) = task::joinable(BlockingTask::new(func)); + let _ = self.blocking_spawner.spawn(task, &self); + handle } } - -cfg_rt_core! { - impl Handle { - /// Spawns a future onto the Tokio runtime. - /// - /// This spawns the given future onto the runtime's executor, usually a - /// thread pool. The thread pool is then responsible for polling the future - /// until it completes. - /// - /// See [module level][mod] documentation for more details. - /// - /// [mod]: index.html - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// let handle = rt.handle(); - /// - /// // Spawn a future onto the runtime - /// handle.spawn(async { - /// println!("now running on a worker thread"); - /// }); - /// # } - /// ``` - /// - /// # Panics - /// - /// This function will not panic unless task execution is disabled on the - /// executor. This can only happen if the runtime was built using - /// [`Builder`] without picking either [`basic_scheduler`] or - /// [`threaded_scheduler`]. - /// - /// [`Builder`]: struct@crate::runtime::Builder - /// [`threaded_scheduler`]: fn@crate::runtime::Builder::threaded_scheduler - /// [`basic_scheduler`]: fn@crate::runtime::Builder::basic_scheduler - pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.spawner.spawn(future) - } - - /// Run a future to completion on the Tokio runtime from a synchronous - /// context. - /// - /// This runs the given future on the runtime, blocking until it is - /// complete, and yielding its resolved result. Any tasks or timers which - /// the future spawns internally will be executed on the runtime. - /// - /// If the provided executor currently has no active core thread, this - /// function might hang until a core thread is added. This is not a - /// concern when using the [threaded scheduler], as it always has active - /// core threads, but if you use the [basic scheduler], some other - /// thread must currently be inside a call to [`Runtime::block_on`]. - /// See also [the module level documentation][1], which has a section on - /// scheduler types. - /// - /// This method may not be called from an asynchronous context. - /// - /// [threaded scheduler]: fn@crate::runtime::Builder::threaded_scheduler - /// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler - /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on - /// [1]: index.html#runtime-configurations - /// - /// # Panics - /// - /// This function panics if the provided future panics, or if called - /// within an asynchronous execution context. - /// - /// # Examples - /// - /// Using `block_on` with the [threaded scheduler]. - /// - /// ``` - /// use tokio::runtime::Runtime; - /// use std::thread; - /// - /// // Create the runtime. - /// // - /// // If the rt-threaded feature is enabled, this creates a threaded - /// // scheduler by default. - /// let rt = Runtime::new().unwrap(); - /// let handle = rt.handle().clone(); - /// - /// // Use the runtime from another thread. - /// let th = thread::spawn(move || { - /// // Execute the future, blocking the current thread until completion. - /// // - /// // This example uses the threaded scheduler, so no concurrent call to - /// // `rt.block_on` is required. - /// handle.block_on(async { - /// println!("hello"); - /// }); - /// }); - /// - /// th.join().unwrap(); - /// ``` - /// - /// Using the [basic scheduler] requires a concurrent call to - /// [`Runtime::block_on`]: - /// - /// [threaded scheduler]: fn@crate::runtime::Builder::threaded_scheduler - /// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler - /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on - /// - /// ``` - /// use tokio::runtime::Builder; - /// use tokio::sync::oneshot; - /// use std::thread; - /// - /// // Create the runtime. - /// let mut rt = Builder::new() - /// .enable_all() - /// .basic_scheduler() - /// .build() - /// .unwrap(); - /// - /// let handle = rt.handle().clone(); - /// - /// // Signal main thread when task has finished. - /// let (send, recv) = oneshot::channel(); - /// - /// // Use the runtime from another thread. - /// let th = thread::spawn(move || { - /// // Execute the future, blocking the current thread until completion. - /// handle.block_on(async { - /// send.send("done").unwrap(); - /// }); - /// }); - /// - /// // The basic scheduler is used, so the thread above might hang if we - /// // didn't call block_on on the rt too. - /// rt.block_on(async { - /// assert_eq!(recv.await.unwrap(), "done"); - /// }); - /// # th.join().unwrap(); - /// ``` - /// - pub fn block_on<F: Future>(&self, future: F) -> F::Output { - self.enter(|| { - let mut enter = crate::runtime::enter(true); - enter.block_on(future).expect("failed to park thread") - }) - } - } -} - -cfg_blocking! { - impl Handle { - /// Runs the provided closure on a thread where blocking is acceptable. - /// - /// In general, issuing a blocking call or performing a lot of compute in a - /// future without yielding is not okay, as it may prevent the executor from - /// driving other futures forward. This function runs the provided closure - /// on a thread dedicated to blocking operations. See the [CPU-bound tasks - /// and blocking code][blocking] section for more information. - /// - /// Tokio will spawn more blocking threads when they are requested through - /// this function until the upper limit configured on the [`Builder`] is - /// reached. This limit is very large by default, because `spawn_blocking` is - /// often used for various kinds of IO operations that cannot be performed - /// asynchronously. When you run CPU-bound code using `spawn_blocking`, you - /// should keep this large upper limit in mind; to run your CPU-bound - /// computations on only a few threads, you should use a separate thread - /// pool such as [rayon] rather than configuring the number of blocking - /// threads. - /// - /// This function is intended for non-async operations that eventually - /// finish on their own. If you want to spawn an ordinary thread, you should - /// use [`thread::spawn`] instead. - /// - /// Closures spawned using `spawn_blocking` cannot be cancelled. When you - /// shut down the executor, it will wait indefinitely for all blocking - /// operations to finish. You can use [`shutdown_timeout`] to stop waiting - /// for them after a certain timeout. Be aware that this will still not - /// cancel the tasks — they are simply allowed to keep running after the - /// method returns. - /// - /// Note that if you are using the [basic scheduler], this function will - /// still spawn additional threads for blocking operations. The basic - /// scheduler's single thread is only used for asynchronous code. - /// - /// [`Builder`]: struct@crate::runtime::Builder - /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code - /// [rayon]: https://docs.rs/rayon - /// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler - /// [`thread::spawn`]: fn@std::thread::spawn - /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{ - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// let handle = rt.handle(); - /// - /// let res = handle.spawn_blocking(move || { - /// // do some compute-heavy work or call synchronous code - /// "done computing" - /// }).await?; - /// - /// assert_eq!(res, "done computing"); - /// # Ok(()) - /// # } - /// ``` - pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R> - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - let (task, handle) = task::joinable(BlockingTask::new(f)); - let _ = self.blocking_spawner.spawn(task, self); - handle - } - } -} - -/// Error returned by `try_current` when no Runtime has been started -pub struct TryCurrentError(()); - -impl fmt::Debug for TryCurrentError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryCurrentError").finish() - } -} - -impl fmt::Display for TryCurrentError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("no tokio Runtime has been initialized") - } -} - -impl error::Error for TryCurrentError {} diff --git a/src/runtime/io.rs b/src/runtime/io.rs deleted file mode 100644 index 6a0953a..0000000 --- a/src/runtime/io.rs +++ /dev/null @@ -1,63 +0,0 @@ -//! Abstracts out the APIs necessary to `Runtime` for integrating the I/O -//! driver. When the `time` feature flag is **not** enabled. These APIs are -//! shells. This isolates the complexity of dealing with conditional -//! compilation. - -/// Re-exported for convenience. -pub(crate) use std::io::Result; - -pub(crate) use variant::*; - -#[cfg(feature = "io-driver")] -mod variant { - use crate::io::driver; - use crate::park::{Either, ParkThread}; - - use std::io; - - /// The driver value the runtime passes to the `timer` layer. - /// - /// When the `io-driver` feature is enabled, this is the "real" I/O driver - /// backed by Mio. Without the `io-driver` feature, this is a thread parker - /// backed by a condition variable. - pub(crate) type Driver = Either<driver::Driver, ParkThread>; - - /// The handle the runtime stores for future use. - /// - /// When the `io-driver` feature is **not** enabled, this is `()`. - pub(crate) type Handle = Option<driver::Handle>; - - pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> { - #[cfg(loom)] - assert!(!enable); - - if enable { - let driver = driver::Driver::new()?; - let handle = driver.handle(); - - Ok((Either::A(driver), Some(handle))) - } else { - let driver = ParkThread::new(); - Ok((Either::B(driver), None)) - } - } -} - -#[cfg(not(feature = "io-driver"))] -mod variant { - use crate::park::ParkThread; - - use std::io; - - /// I/O is not enabled, use a condition variable based parker - pub(crate) type Driver = ParkThread; - - /// There is no handle - pub(crate) type Handle = (); - - pub(crate) fn create_driver(_enable: bool) -> io::Result<(Driver, Handle)> { - let driver = ParkThread::new(); - - Ok((driver, ())) - } -} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 300a146..be4aa38 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,8 +1,7 @@ //! The Tokio runtime. //! -//! Unlike other Rust programs, asynchronous applications require -//! runtime support. In particular, the following runtime services are -//! necessary: +//! Unlike other Rust programs, asynchronous applications require runtime +//! support. In particular, the following runtime services are necessary: //! //! * An **I/O event loop**, called the driver, which drives I/O resources and //! dispatches I/O events to tasks that depend on them. @@ -10,14 +9,14 @@ //! * A **timer** for scheduling work to run after a set period of time. //! //! Tokio's [`Runtime`] bundles all of these services as a single type, allowing -//! them to be started, shut down, and configured together. However, most -//! applications won't need to use [`Runtime`] directly. Instead, they can -//! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under -//! the hood. +//! them to be started, shut down, and configured together. However, often it is +//! not required to configure a [`Runtime`] manually, and user may just use the +//! [`tokio::main`] attribute macro, which creates a [`Runtime`] under the hood. //! //! # Usage //! -//! Most applications will use the [`tokio::main`] attribute macro. +//! When no fine tuning is required, the [`tokio::main`] attribute macro can be +//! used. //! //! ```no_run //! use tokio::net::TcpListener; @@ -25,7 +24,7 @@ //! //! #[tokio::main] //! async fn main() -> Result<(), Box<dyn std::error::Error>> { -//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +//! let listener = TcpListener::bind("127.0.0.1:8080").await?; //! //! loop { //! let (mut socket, _) = listener.accept().await?; @@ -69,11 +68,11 @@ //! //! fn main() -> Result<(), Box<dyn std::error::Error>> { //! // Create the runtime -//! let mut rt = Runtime::new()?; +//! let rt = Runtime::new()?; //! //! // Spawn the root task //! rt.block_on(async { -//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +//! let listener = TcpListener::bind("127.0.0.1:8080").await?; //! //! loop { //! let (mut socket, _) = listener.accept().await?; @@ -111,48 +110,38 @@ //! applications. The [runtime builder] or `#[tokio::main]` attribute may be //! used to select which scheduler to use. //! -//! #### Basic Scheduler +//! #### Multi-Thread Scheduler //! -//! The basic scheduler provides a _single-threaded_ future executor. All tasks -//! will be created and executed on the current thread. The basic scheduler -//! requires the `rt-core` feature flag, and can be selected using the -//! [`Builder::basic_scheduler`] method: +//! The multi-thread scheduler executes futures on a _thread pool_, using a +//! work-stealing strategy. By default, it will start a worker thread for each +//! CPU core available on the system. This tends to be the ideal configurations +//! for most applications. The multi-thread scheduler requires the `rt-multi-thread` +//! feature flag, and is selected by default: //! ``` //! use tokio::runtime; //! //! # fn main() -> Result<(), Box<dyn std::error::Error>> { -//! let basic_rt = runtime::Builder::new() -//! .basic_scheduler() -//! .build()?; +//! let threaded_rt = runtime::Runtime::new()?; //! # Ok(()) } //! ``` //! -//! If the `rt-core` feature is enabled and `rt-threaded` is not, -//! [`Runtime::new`] will return a basic scheduler runtime by default. +//! Most applications should use the multi-thread scheduler, except in some +//! niche use-cases, such as when running only a single thread is required. //! -//! #### Threaded Scheduler +//! #### Current-Thread Scheduler //! -//! The threaded scheduler executes futures on a _thread pool_, using a -//! work-stealing strategy. By default, it will start a worker thread for each -//! CPU core available on the system. This tends to be the ideal configurations -//! for most applications. The threaded scheduler requires the `rt-threaded` feature -//! flag, and can be selected using the [`Builder::threaded_scheduler`] method: +//! The current-thread scheduler provides a _single-threaded_ future executor. +//! All tasks will be created and executed on the current thread. This requires +//! the `rt` feature flag. //! ``` //! use tokio::runtime; //! //! # fn main() -> Result<(), Box<dyn std::error::Error>> { -//! let threaded_rt = runtime::Builder::new() -//! .threaded_scheduler() +//! let basic_rt = runtime::Builder::new_current_thread() //! .build()?; //! # Ok(()) } //! ``` //! -//! If the `rt-threaded` feature flag is enabled, [`Runtime::new`] will return a -//! threaded scheduler runtime by default. -//! -//! Most applications should use the threaded scheduler, except in some niche -//! use-cases, such as when running only a single thread is required. -//! //! #### Resource drivers //! //! When configuring a runtime by hand, no resource drivers are enabled by @@ -164,8 +153,8 @@ //! ## Lifetime of spawned threads //! //! The runtime may spawn threads depending on its configuration and usage. The -//! threaded scheduler spawns threads to schedule tasks and calls to -//! `spawn_blocking` spawn threads to run blocking operations. +//! multi-thread scheduler spawns threads to schedule tasks and for `spawn_blocking` +//! calls. //! //! While the `Runtime` is active, threads may shutdown after periods of being //! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown. @@ -188,394 +177,380 @@ #[macro_use] mod tests; -pub(crate) mod context; +pub(crate) mod enter; + +pub(crate) mod task; -cfg_rt_core! { +cfg_rt! { mod basic_scheduler; use basic_scheduler::BasicScheduler; - pub(crate) mod task; -} - -mod blocking; -use blocking::BlockingPool; + mod blocking; + use blocking::BlockingPool; + pub(crate) use blocking::spawn_blocking; -cfg_blocking_impl! { - #[allow(unused_imports)] - pub(crate) use blocking::{spawn_blocking, try_spawn_blocking}; -} + mod builder; + pub use self::builder::Builder; -mod builder; -pub use self::builder::Builder; + pub(crate) mod context; + pub(crate) mod driver; -pub(crate) mod enter; -use self::enter::enter; + use self::enter::enter; -mod handle; -pub use self::handle::{Handle, TryCurrentError}; + mod handle; + use handle::Handle; -mod io; + mod spawner; + use self::spawner::Spawner; +} -cfg_rt_threaded! { +cfg_rt_multi_thread! { mod park; use park::Parker; } -mod shell; -use self::shell::Shell; - -mod spawner; -use self::spawner::Spawner; - -mod time; - -cfg_rt_threaded! { +cfg_rt_multi_thread! { mod queue; pub(crate) mod thread_pool; use self::thread_pool::ThreadPool; } -cfg_rt_core! { +cfg_rt! { use crate::task::JoinHandle; -} - -use std::future::Future; -use std::time::Duration; - -/// The Tokio runtime. -/// -/// The runtime provides an I/O driver, task scheduler, [timer], and blocking -/// pool, necessary for running asynchronous tasks. -/// -/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, -/// most users will use the `#[tokio::main]` annotation on their entry point instead. -/// -/// See [module level][mod] documentation for more details. -/// -/// # Shutdown -/// -/// Shutting down the runtime is done by dropping the value. The current thread -/// will block until the shut down operation has completed. -/// -/// * Drain any scheduled work queues. -/// * Drop any futures that have not yet completed. -/// * Drop the reactor. -/// -/// Once the reactor has dropped, any outstanding I/O resources bound to -/// that reactor will no longer function. Calling any method on them will -/// result in an error. -/// -/// [timer]: crate::time -/// [mod]: index.html -/// [`new`]: method@Self::new -/// [`Builder`]: struct@Builder -/// [`tokio::run`]: fn@run -#[derive(Debug)] -pub struct Runtime { - /// Task executor - kind: Kind, - - /// Handle to runtime, also contains driver handles - handle: Handle, - - /// Blocking pool handle, used to signal shutdown - blocking_pool: BlockingPool, -} - -/// The runtime executor is either a thread-pool or a current-thread executor. -#[derive(Debug)] -enum Kind { - /// Not able to execute concurrent tasks. This variant is mostly used to get - /// access to the driver handles. - Shell(Shell), - /// Execute all tasks on the current-thread. - #[cfg(feature = "rt-core")] - Basic(BasicScheduler<time::Driver>), + use std::future::Future; + use std::time::Duration; - /// Execute tasks across multiple threads. - #[cfg(feature = "rt-threaded")] - ThreadPool(ThreadPool), -} - -/// After thread starts / before thread stops -type Callback = std::sync::Arc<dyn Fn() + Send + Sync>; - -impl Runtime { - /// Create a new runtime instance with default configuration values. + /// The Tokio runtime. /// - /// This results in a scheduler, I/O driver, and time driver being - /// initialized. The type of scheduler used depends on what feature flags - /// are enabled: if the `rt-threaded` feature is enabled, the [threaded - /// scheduler] is used, while if only the `rt-core` feature is enabled, the - /// [basic scheduler] is used instead. + /// The runtime provides an I/O driver, task scheduler, [timer], and + /// blocking pool, necessary for running asynchronous tasks. /// - /// If the threaded scheduler is selected, it will not spawn - /// any worker threads until it needs to, i.e. tasks are scheduled to run. - /// - /// Most applications will not need to call this function directly. Instead, - /// they will use the [`#[tokio::main]` attribute][main]. When more complex - /// configuration is necessary, the [runtime builder] may be used. + /// Instances of `Runtime` can be created using [`new`], or [`Builder`]. + /// However, most users will use the `#[tokio::main]` annotation on their + /// entry point instead. /// /// See [module level][mod] documentation for more details. /// - /// # Examples - /// - /// Creating a new `Runtime` with default configuration values. + /// # Shutdown /// - /// ``` - /// use tokio::runtime::Runtime; + /// Shutting down the runtime is done by dropping the value. The current + /// thread will block until the shut down operation has completed. /// - /// let rt = Runtime::new() - /// .unwrap(); + /// * Drain any scheduled work queues. + /// * Drop any futures that have not yet completed. + /// * Drop the reactor. /// - /// // Use the runtime... - /// ``` + /// Once the reactor has dropped, any outstanding I/O resources bound to + /// that reactor will no longer function. Calling any method on them will + /// result in an error. /// - /// [mod]: index.html - /// [main]: ../attr.main.html - /// [threaded scheduler]: index.html#threaded-scheduler - /// [basic scheduler]: index.html#basic-scheduler - /// [runtime builder]: crate::runtime::Builder - pub fn new() -> io::Result<Runtime> { - #[cfg(feature = "rt-threaded")] - let ret = Builder::new().threaded_scheduler().enable_all().build(); - - #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))] - let ret = Builder::new().basic_scheduler().enable_all().build(); - - #[cfg(not(feature = "rt-core"))] - let ret = Builder::new().enable_all().build(); - - ret - } - - /// Spawn a future onto the Tokio runtime. + /// # Sharing /// - /// This spawns the given future onto the runtime's executor, usually a - /// thread pool. The thread pool is then responsible for polling the future - /// until it completes. + /// The Tokio runtime implements `Sync` and `Send` to allow you to wrap it + /// in a `Arc`. Most fn take `&self` to allow you to call them concurrently + /// accross multiple threads. /// - /// See [module level][mod] documentation for more details. + /// Calls to `shutdown` and `shutdown_timeout` require exclusive ownership of + /// the runtime type and this can be achieved via `Arc::try_unwrap` when only + /// one strong count reference is left over. /// + /// [timer]: crate::time /// [mod]: index.html - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// - /// // Spawn a future onto the runtime - /// rt.spawn(async { - /// println!("now running on a worker thread"); - /// }); - /// # } - /// ``` - /// - /// # Panics - /// - /// This function will not panic unless task execution is disabled on the - /// executor. This can only happen if the runtime was built using - /// [`Builder`] without picking either [`basic_scheduler`] or - /// [`threaded_scheduler`]. - /// + /// [`new`]: method@Self::new /// [`Builder`]: struct@Builder - /// [`threaded_scheduler`]: fn@Builder::threaded_scheduler - /// [`basic_scheduler`]: fn@Builder::basic_scheduler - #[cfg(feature = "rt-core")] - pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - match &self.kind { - Kind::Shell(_) => panic!("task execution disabled"), - #[cfg(feature = "rt-threaded")] - Kind::ThreadPool(exec) => exec.spawn(future), - Kind::Basic(exec) => exec.spawn(future), - } - } + #[derive(Debug)] + pub struct Runtime { + /// Task executor + kind: Kind, - /// Run a future to completion on the Tokio runtime. This is the runtime's - /// entry point. - /// - /// This runs the given future on the runtime, blocking until it is - /// complete, and yielding its resolved result. Any tasks or timers which - /// the future spawns internally will be executed on the runtime. - /// - /// `&mut` is required as calling `block_on` **may** result in advancing the - /// state of the runtime. The details depend on how the runtime is - /// configured. [`runtime::Handle::block_on`][handle] provides a version - /// that takes `&self`. - /// - /// This method may not be called from an asynchronous context. - /// - /// # Panics - /// - /// This function panics if the provided future panics, or if called within an - /// asynchronous execution context. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::runtime::Runtime; - /// - /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); - /// - /// // Execute the future, blocking the current thread until completion - /// rt.block_on(async { - /// println!("hello"); - /// }); - /// ``` - /// - /// [handle]: fn@Handle::block_on - pub fn block_on<F: Future>(&mut self, future: F) -> F::Output { - let kind = &mut self.kind; + /// Handle to runtime, also contains driver handles + handle: Handle, - self.handle.enter(|| match kind { - Kind::Shell(exec) => exec.block_on(future), - #[cfg(feature = "rt-core")] - Kind::Basic(exec) => exec.block_on(future), - #[cfg(feature = "rt-threaded")] - Kind::ThreadPool(exec) => exec.block_on(future), - }) + /// Blocking pool handle, used to signal shutdown + blocking_pool: BlockingPool, } - /// Enter the runtime context. This allows you to construct types that must - /// have an executor available on creation such as [`Delay`] or [`TcpStream`]. - /// It will also allow you to call methods such as [`tokio::spawn`]. - /// - /// This function is also available as [`Handle::enter`]. - /// - /// [`Delay`]: struct@crate::time::Delay - /// [`TcpStream`]: struct@crate::net::TcpStream - /// [`Handle::enter`]: fn@crate::runtime::Handle::enter - /// [`tokio::spawn`]: fn@crate::spawn - /// - /// # Example - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// fn function_that_spawns(msg: String) { - /// // Had we not used `rt.enter` below, this would panic. - /// tokio::spawn(async move { - /// println!("{}", msg); - /// }); - /// } + /// Runtime context guard. /// - /// fn main() { - /// let rt = Runtime::new().unwrap(); - /// - /// let s = "Hello World!".to_string(); - /// - /// // By entering the context, we tie `tokio::spawn` to this executor. - /// rt.enter(|| function_that_spawns(s)); - /// } - /// ``` - pub fn enter<F, R>(&self, f: F) -> R - where - F: FnOnce() -> R, - { - self.handle.enter(f) + /// Returned by [`Runtime::enter`], the context guard exits the runtime + /// context on drop. + #[derive(Debug)] + pub struct EnterGuard<'a> { + rt: &'a Runtime, + guard: context::EnterGuard, } - /// Return a handle to the runtime's spawner. - /// - /// The returned handle can be used to spawn tasks that run on this runtime, and can - /// be cloned to allow moving the `Handle` to other threads. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// let handle = rt.handle(); - /// - /// handle.spawn(async { println!("hello"); }); - /// ``` - pub fn handle(&self) -> &Handle { - &self.handle - } + /// The runtime executor is either a thread-pool or a current-thread executor. + #[derive(Debug)] + enum Kind { + /// Execute all tasks on the current-thread. + CurrentThread(BasicScheduler<driver::Driver>), - /// Shutdown the runtime, waiting for at most `duration` for all spawned - /// task to shutdown. - /// - /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to - /// shutdown in a timely fashion. However, dropping a `Runtime` will wait - /// indefinitely for all tasks to terminate, and there are cases where a long - /// blocking task has been spawned, which can block dropping `Runtime`. - /// - /// In this case, calling `shutdown_timeout` with an explicit wait timeout - /// can work. The `shutdown_timeout` will signal all tasks to shutdown and - /// will wait for at most `duration` for all spawned tasks to terminate. If - /// `timeout` elapses before all tasks are dropped, the function returns and - /// outstanding tasks are potentially leaked. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// use tokio::task; - /// - /// use std::thread; - /// use std::time::Duration; - /// - /// fn main() { - /// let mut runtime = Runtime::new().unwrap(); - /// - /// runtime.block_on(async move { - /// task::spawn_blocking(move || { - /// thread::sleep(Duration::from_secs(10_000)); - /// }); - /// }); - /// - /// runtime.shutdown_timeout(Duration::from_millis(100)); - /// } - /// ``` - pub fn shutdown_timeout(self, duration: Duration) { - let Runtime { - mut blocking_pool, .. - } = self; - blocking_pool.shutdown(Some(duration)); + /// Execute tasks across multiple threads. + #[cfg(feature = "rt-multi-thread")] + ThreadPool(ThreadPool), } - /// Shutdown the runtime, without waiting for any spawned tasks to shutdown. - /// - /// This can be useful if you want to drop a runtime from within another runtime. - /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks - /// to complete, which would normally not be permitted within an asynchronous context. - /// By calling `shutdown_background()`, you can drop the runtime from such a context. - /// - /// Note however, that because we do not wait for any blocking tasks to complete, this - /// may result in a resource leak (in that any blocking tasks are still running until they - /// return. - /// - /// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`. - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// fn main() { - /// let mut runtime = Runtime::new().unwrap(); - /// - /// runtime.block_on(async move { - /// let inner_runtime = Runtime::new().unwrap(); - /// // ... - /// inner_runtime.shutdown_background(); - /// }); - /// } - /// ``` - pub fn shutdown_background(self) { - self.shutdown_timeout(Duration::from_nanos(0)) + /// After thread starts / before thread stops + type Callback = std::sync::Arc<dyn Fn() + Send + Sync>; + + impl Runtime { + /// Create a new runtime instance with default configuration values. + /// + /// This results in the multi threaded scheduler, I/O driver, and time driver being + /// initialized. + /// + /// Most applications will not need to call this function directly. Instead, + /// they will use the [`#[tokio::main]` attribute][main]. When a more complex + /// configuration is necessary, the [runtime builder] may be used. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// ``` + /// + /// [mod]: index.html + /// [main]: ../attr.main.html + /// [threaded scheduler]: index.html#threaded-scheduler + /// [basic scheduler]: index.html#basic-scheduler + /// [runtime builder]: crate::runtime::Builder + #[cfg(feature = "rt-multi-thread")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] + pub fn new() -> std::io::Result<Runtime> { + Builder::new_multi_thread().enable_all().build() + } + + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(async { + /// println!("now running on a worker thread"); + /// }); + /// # } + /// ``` + pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match &self.kind { + #[cfg(feature = "rt-multi-thread")] + Kind::ThreadPool(exec) => exec.spawn(future), + Kind::CurrentThread(exec) => exec.spawn(future), + } + } + + /// Run the provided function on an executor dedicated to blocking operations. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Spawn a blocking function onto the runtime + /// rt.spawn_blocking(|| { + /// println!("now running on a worker thread"); + /// }); + /// # } + pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> + where + F: FnOnce() -> R + Send + 'static, + { + self.handle.spawn_blocking(func) + } + + /// Run a future to completion on the Tokio runtime. This is the + /// runtime's entry point. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers + /// which the future spawns internally will be executed on the runtime. + /// + /// # Multi thread scheduler + /// + /// When the multi thread scheduler is used this will allow futures + /// to run within the io driver and timer context of the overall runtime. + /// + /// # Current thread scheduler + /// + /// When the current thread scheduler is enabled `block_on` + /// can be called concurrently from multiple threads. The first call + /// will take ownership of the io and timer drivers. This means + /// other threads which do not own the drivers will hook into that one. + /// When the first `block_on` completes, other threads will be able to + /// "steal" the driver to allow continued execution of their futures. + /// + /// # Panics + /// + /// This function panics if the provided future panics, or if not called within an + /// asynchronous execution context. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::runtime::Runtime; + /// + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Execute the future, blocking the current thread until completion + /// rt.block_on(async { + /// println!("hello"); + /// }); + /// ``` + /// + /// [handle]: fn@Handle::block_on + pub fn block_on<F: Future>(&self, future: F) -> F::Output { + let _enter = self.enter(); + + match &self.kind { + Kind::CurrentThread(exec) => exec.block_on(future), + #[cfg(feature = "rt-multi-thread")] + Kind::ThreadPool(exec) => exec.block_on(future), + } + } + + /// Enter the runtime context. + /// + /// This allows you to construct types that must have an executor + /// available on creation such as [`Sleep`] or [`TcpStream`]. It will + /// also allow you to call methods such as [`tokio::spawn`]. + /// + /// [`Sleep`]: struct@crate::time::Sleep + /// [`TcpStream`]: struct@crate::net::TcpStream + /// [`tokio::spawn`]: fn@crate::spawn + /// + /// # Example + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// fn function_that_spawns(msg: String) { + /// // Had we not used `rt.enter` below, this would panic. + /// tokio::spawn(async move { + /// println!("{}", msg); + /// }); + /// } + /// + /// fn main() { + /// let rt = Runtime::new().unwrap(); + /// + /// let s = "Hello World!".to_string(); + /// + /// // By entering the context, we tie `tokio::spawn` to this executor. + /// let _guard = rt.enter(); + /// function_that_spawns(s); + /// } + /// ``` + pub fn enter(&self) -> EnterGuard<'_> { + EnterGuard { + rt: self, + guard: context::enter(self.handle.clone()), + } + } + + /// Shutdown the runtime, waiting for at most `duration` for all spawned + /// task to shutdown. + /// + /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to + /// shutdown in a timely fashion. However, dropping a `Runtime` will wait + /// indefinitely for all tasks to terminate, and there are cases where a long + /// blocking task has been spawned, which can block dropping `Runtime`. + /// + /// In this case, calling `shutdown_timeout` with an explicit wait timeout + /// can work. The `shutdown_timeout` will signal all tasks to shutdown and + /// will wait for at most `duration` for all spawned tasks to terminate. If + /// `timeout` elapses before all tasks are dropped, the function returns and + /// outstanding tasks are potentially leaked. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// use std::thread; + /// use std::time::Duration; + /// + /// fn main() { + /// let runtime = Runtime::new().unwrap(); + /// + /// runtime.block_on(async move { + /// task::spawn_blocking(move || { + /// thread::sleep(Duration::from_secs(10_000)); + /// }); + /// }); + /// + /// runtime.shutdown_timeout(Duration::from_millis(100)); + /// } + /// ``` + pub fn shutdown_timeout(mut self, duration: Duration) { + // Wakeup and shutdown all the worker threads + self.handle.spawner.shutdown(); + self.blocking_pool.shutdown(Some(duration)); + } + + /// Shutdown the runtime, without waiting for any spawned tasks to shutdown. + /// + /// This can be useful if you want to drop a runtime from within another runtime. + /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks + /// to complete, which would normally not be permitted within an asynchronous context. + /// By calling `shutdown_background()`, you can drop the runtime from such a context. + /// + /// Note however, that because we do not wait for any blocking tasks to complete, this + /// may result in a resource leak (in that any blocking tasks are still running until they + /// return. + /// + /// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// fn main() { + /// let runtime = Runtime::new().unwrap(); + /// + /// runtime.block_on(async move { + /// let inner_runtime = Runtime::new().unwrap(); + /// // ... + /// inner_runtime.shutdown_background(); + /// }); + /// } + /// ``` + pub fn shutdown_background(self) { + self.shutdown_timeout(Duration::from_nanos(0)) + } } } diff --git a/src/runtime/park.rs b/src/runtime/park.rs index ee437d1..033b9f2 100644 --- a/src/runtime/park.rs +++ b/src/runtime/park.rs @@ -6,7 +6,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; use crate::park::{Park, Unpark}; -use crate::runtime::time; +use crate::runtime::driver::Driver; use crate::util::TryLock; use std::sync::atomic::Ordering::SeqCst; @@ -42,14 +42,14 @@ const NOTIFIED: usize = 3; /// Shared across multiple Parker handles struct Shared { /// Shared driver. Only one thread at a time can use this - driver: TryLock<time::Driver>, + driver: TryLock<Driver>, /// Unpark handle - handle: <time::Driver as Park>::Unpark, + handle: <Driver as Park>::Unpark, } impl Parker { - pub(crate) fn new(driver: time::Driver) -> Parker { + pub(crate) fn new(driver: Driver) -> Parker { let handle = driver.unpark(); Parker { @@ -104,6 +104,10 @@ impl Park for Parker { Ok(()) } } + + fn shutdown(&mut self) { + self.inner.shutdown(); + } } impl Unpark for Unparker { @@ -138,7 +142,7 @@ impl Inner { fn park_condvar(&self) { // Otherwise we need to coordinate going to sleep - let mut m = self.mutex.lock().unwrap(); + let mut m = self.mutex.lock(); match self .state @@ -176,7 +180,7 @@ impl Inner { } } - fn park_driver(&self, driver: &mut time::Driver) { + fn park_driver(&self, driver: &mut Driver) { match self .state .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) @@ -234,7 +238,7 @@ impl Inner { // Releasing `lock` before the call to `notify_one` means that when the // parked thread wakes it doesn't get woken only to have to wait for us // to release `lock`. - drop(self.mutex.lock().unwrap()); + drop(self.mutex.lock()); self.condvar.notify_one() } @@ -242,4 +246,12 @@ impl Inner { fn unpark_driver(&self) { self.shared.handle.unpark(); } + + fn shutdown(&self) { + if let Some(mut driver) = self.shared.driver.try_lock() { + driver.shutdown(); + } + + self.condvar.notify_all(); + } } diff --git a/src/runtime/queue.rs b/src/runtime/queue.rs index c654514..cdf4009 100644 --- a/src/runtime/queue.rs +++ b/src/runtime/queue.rs @@ -481,7 +481,7 @@ impl<T: 'static> Inject<T> { /// Close the injection queue, returns `true` if the queue is open when the /// transition is made. pub(super) fn close(&self) -> bool { - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); if p.is_closed { return false; @@ -492,7 +492,7 @@ impl<T: 'static> Inject<T> { } pub(super) fn is_closed(&self) -> bool { - self.pointers.lock().unwrap().is_closed + self.pointers.lock().is_closed } pub(super) fn len(&self) -> usize { @@ -502,7 +502,7 @@ impl<T: 'static> Inject<T> { /// Pushes a value into the queue. pub(super) fn push(&self, task: task::Notified<T>) { // Acquire queue lock - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); if p.is_closed { // Drop the mutex to avoid a potential deadlock when @@ -541,7 +541,7 @@ impl<T: 'static> Inject<T> { debug_assert!(get_next(batch_tail).is_none()); - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); if let Some(tail) = p.tail { set_next(tail, Some(batch_head)); @@ -566,7 +566,7 @@ impl<T: 'static> Inject<T> { return None; } - let mut p = self.pointers.lock().unwrap(); + let mut p = self.pointers.lock(); // It is possible to hit null here if another thread poped the last // task between us checking `len` and acquiring the lock. diff --git a/src/runtime/shell.rs b/src/runtime/shell.rs index a65869d..486d4fa 100644 --- a/src/runtime/shell.rs +++ b/src/runtime/shell.rs @@ -1,52 +1,84 @@ #![allow(clippy::redundant_clone)] +use crate::future::poll_fn; use crate::park::{Park, Unpark}; -use crate::runtime::enter; -use crate::runtime::time; +use crate::runtime::driver::Driver; +use crate::sync::Notify; use crate::util::{waker_ref, Wake}; -use std::future::Future; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::Context; -use std::task::Poll::Ready; +use std::task::Poll::{Pending, Ready}; +use std::{future::Future, sync::PoisonError}; #[derive(Debug)] pub(super) struct Shell { - driver: time::Driver, + driver: Mutex<Option<Driver>>, + + notify: Notify, /// TODO: don't store this unpark: Arc<Handle>, } #[derive(Debug)] -struct Handle(<time::Driver as Park>::Unpark); +struct Handle(<Driver as Park>::Unpark); impl Shell { - pub(super) fn new(driver: time::Driver) -> Shell { + pub(super) fn new(driver: Driver) -> Shell { let unpark = Arc::new(Handle(driver.unpark())); - Shell { driver, unpark } + Shell { + driver: Mutex::new(Some(driver)), + notify: Notify::new(), + unpark, + } } - pub(super) fn block_on<F>(&mut self, f: F) -> F::Output + pub(super) fn block_on<F>(&self, f: F) -> F::Output where F: Future, { - let _e = enter(true); + let mut enter = crate::runtime::enter(true); pin!(f); - let waker = waker_ref(&self.unpark); - let mut cx = Context::from_waker(&waker); - loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { - return v; - } + if let Some(driver) = &mut self.take_driver() { + return driver.block_on(f); + } else { + let notified = self.notify.notified(); + pin!(notified); + + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } + + if let Ready(out) = f.as_mut().poll(cx) { + return Ready(Some(out)); + } - self.driver.park().unwrap(); + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } + } } } + + fn take_driver(&self) -> Option<DriverGuard<'_>> { + let mut lock = self.driver.lock().unwrap(); + let driver = lock.take()?; + + Some(DriverGuard { + inner: Some(driver), + shell: &self, + }) + } } impl Wake for Handle { @@ -60,3 +92,41 @@ impl Wake for Handle { arc_self.0.unpark(); } } + +struct DriverGuard<'a> { + inner: Option<Driver>, + shell: &'a Shell, +} + +impl DriverGuard<'_> { + fn block_on<F: Future>(&mut self, f: F) -> F::Output { + let driver = self.inner.as_mut().unwrap(); + + pin!(f); + + let waker = waker_ref(&self.shell.unpark); + let mut cx = Context::from_waker(&waker); + + loop { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + return v; + } + + driver.park().unwrap(); + } + } +} + +impl Drop for DriverGuard<'_> { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + self.shell + .driver + .lock() + .unwrap_or_else(PoisonError::into_inner) + .replace(inner); + + self.shell.notify.notify_one(); + } + } +} diff --git a/src/runtime/spawner.rs b/src/runtime/spawner.rs index d136945..a37c667 100644 --- a/src/runtime/spawner.rs +++ b/src/runtime/spawner.rs @@ -1,24 +1,34 @@ -cfg_rt_core! { +cfg_rt! { use crate::runtime::basic_scheduler; use crate::task::JoinHandle; use std::future::Future; } -cfg_rt_threaded! { +cfg_rt_multi_thread! { use crate::runtime::thread_pool; } #[derive(Debug, Clone)] pub(crate) enum Spawner { - Shell, - #[cfg(feature = "rt-core")] + #[cfg(feature = "rt")] Basic(basic_scheduler::Spawner), - #[cfg(feature = "rt-threaded")] + #[cfg(feature = "rt-multi-thread")] ThreadPool(thread_pool::Spawner), } -cfg_rt_core! { +impl Spawner { + pub(crate) fn shutdown(&mut self) { + #[cfg(feature = "rt-multi-thread")] + { + if let Spawner::ThreadPool(spawner) = self { + spawner.shutdown(); + } + } + } +} + +cfg_rt! { impl Spawner { pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> where @@ -26,10 +36,9 @@ cfg_rt_core! { F::Output: Send + 'static, { match self { - Spawner::Shell => panic!("spawning not enabled for runtime"), - #[cfg(feature = "rt-core")] + #[cfg(feature = "rt")] Spawner::Basic(spawner) => spawner.spawn(future), - #[cfg(feature = "rt-threaded")] + #[cfg(feature = "rt-multi-thread")] Spawner::ThreadPool(spawner) => spawner.spawn(future), } } diff --git a/src/runtime/task/core.rs b/src/runtime/task/core.rs index f4756c2..dfa8764 100644 --- a/src/runtime/task/core.rs +++ b/src/runtime/task/core.rs @@ -269,7 +269,7 @@ impl<T: Future, S: Schedule> Core<T, S> { } } -cfg_rt_threaded! { +cfg_rt_multi_thread! { impl Header { pub(crate) fn shutdown(&self) { use crate::runtime::task::RawTask; diff --git a/src/runtime/task/error.rs b/src/runtime/task/error.rs index d5f65a4..177fe65 100644 --- a/src/runtime/task/error.rs +++ b/src/runtime/task/error.rs @@ -3,7 +3,7 @@ use std::fmt; use std::io; use std::sync::Mutex; -doc_rt_core! { +cfg_rt! { /// Task failed to execute to completion. pub struct JoinError { repr: Repr, @@ -16,25 +16,13 @@ enum Repr { } impl JoinError { - #[doc(hidden)] - #[deprecated] - pub fn cancelled() -> JoinError { - Self::cancelled2() - } - - pub(crate) fn cancelled2() -> JoinError { + pub(crate) fn cancelled() -> JoinError { JoinError { repr: Repr::Cancelled, } } - #[doc(hidden)] - #[deprecated] - pub fn panic(err: Box<dyn Any + Send + 'static>) -> JoinError { - Self::panic2(err) - } - - pub(crate) fn panic2(err: Box<dyn Any + Send + 'static>) -> JoinError { + pub(crate) fn panic(err: Box<dyn Any + Send + 'static>) -> JoinError { JoinError { repr: Repr::Panic(Mutex::new(err)), } @@ -42,10 +30,7 @@ impl JoinError { /// Returns true if the error was caused by the task being cancelled pub fn is_cancelled(&self) -> bool { - match &self.repr { - Repr::Cancelled => true, - _ => false, - } + matches!(&self.repr, Repr::Cancelled) } /// Returns true if the error was caused by the task panicking @@ -65,10 +50,7 @@ impl JoinError { /// } /// ``` pub fn is_panic(&self) -> bool { - match &self.repr { - Repr::Panic(_) => true, - _ => false, - } + matches!(&self.repr, Repr::Panic(_)) } /// Consumes the join error, returning the object with which the task panicked. diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs index e86b29e..208d48c 100644 --- a/src/runtime/task/harness.rs +++ b/src/runtime/task/harness.rs @@ -102,7 +102,7 @@ where // If the task is cancelled, avoid polling it, instead signalling it // is complete. if snapshot.is_cancelled() { - Poll::Ready(Err(JoinError::cancelled2())) + Poll::Ready(Err(JoinError::cancelled())) } else { let res = guard.core.poll(self.header()); @@ -132,7 +132,7 @@ where } } Err(err) => { - self.complete(Err(JoinError::panic2(err)), snapshot.is_join_interested()); + self.complete(Err(JoinError::panic(err)), snapshot.is_join_interested()); } } } @@ -297,9 +297,9 @@ where // Dropping the future panicked, complete the join // handle with the panic to avoid dropping the panic // on the ground. - self.complete(Err(JoinError::panic2(err)), true); + self.complete(Err(JoinError::panic(err)), true); } else { - self.complete(Err(JoinError::cancelled2()), true); + self.complete(Err(JoinError::cancelled()), true); } } diff --git a/src/runtime/task/join.rs b/src/runtime/task/join.rs index 3c4aabb..dedfb38 100644 --- a/src/runtime/task/join.rs +++ b/src/runtime/task/join.rs @@ -6,7 +6,7 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -doc_rt_core! { +cfg_rt! { /// An owned permission to join on a task (await its termination). /// /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for @@ -45,6 +45,71 @@ doc_rt_core! { /// # } /// ``` /// + /// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task. + /// If the return value is an i32, the join handle has type `JoinHandle<i32>`: + /// + /// ``` + /// use tokio::task; + /// + /// # async fn doc() { + /// let join_handle: task::JoinHandle<i32> = task::spawn(async { + /// 5 + 3 + /// }); + /// # } + /// + /// ``` + /// + /// If the task does not have a return value, the join handle has type `JoinHandle<()>`: + /// + /// ``` + /// use tokio::task; + /// + /// # async fn doc() { + /// let join_handle: task::JoinHandle<()> = task::spawn(async { + /// println!("I return nothing."); + /// }); + /// # } + /// ``` + /// + /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a + /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has + /// to be double chained to extract the returned value: + /// + /// ``` + /// use tokio::task; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async { + /// Ok(5 + 3) + /// }); + /// + /// let result = join_handle.await??; + /// assert_eq!(result, 8); + /// Ok(()) + /// } + /// ``` + /// + /// If the task panics, the error is a [`JoinError`] that contains the panic: + /// + /// ``` + /// use tokio::task; + /// use std::io; + /// use std::panic; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async { + /// panic!("boom"); + /// }); + /// + /// let err = join_handle.await.unwrap_err(); + /// assert!(err.is_panic()); + /// Ok(()) + /// } + /// + /// ``` /// Child being detached and outliving its parent: /// /// ```no_run @@ -56,7 +121,7 @@ doc_rt_core! { /// let original_task = task::spawn(async { /// let _detached_task = task::spawn(async { /// // Here we sleep to make sure that the first task returns before. - /// time::delay_for(Duration::from_millis(10)).await; + /// time::sleep(Duration::from_millis(10)).await; /// // This will be called, even though the JoinHandle is dropped. /// println!("♫ Still alive ♫"); /// }); @@ -68,13 +133,14 @@ doc_rt_core! { /// // We make sure that the new task has time to run, before the main /// // task returns. /// - /// time::delay_for(Duration::from_millis(1000)).await; + /// time::sleep(Duration::from_millis(1000)).await; /// # } /// ``` /// /// [`task::spawn`]: crate::task::spawn() /// [`task::spawn_blocking`]: crate::task::spawn_blocking /// [`std::thread::JoinHandle`]: std::thread::JoinHandle + /// [`JoinError`]: crate::task::JoinError pub struct JoinHandle<T> { raw: Option<RawTask>, _p: PhantomData<T>, @@ -91,6 +157,44 @@ impl<T> JoinHandle<T> { _p: PhantomData, } } + + /// Abort the task associated with the handle. + /// + /// Awaiting a cancelled task might complete as usual if the task was + /// already completed at the time it was cancelled, but most likely it + /// will complete with a `Err(JoinError::Cancelled)`. + /// + /// ```rust + /// use tokio::time; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut handles = Vec::new(); + /// + /// handles.push(tokio::spawn(async { + /// time::sleep(time::Duration::from_secs(10)).await; + /// true + /// })); + /// + /// handles.push(tokio::spawn(async { + /// time::sleep(time::Duration::from_secs(10)).await; + /// false + /// })); + /// + /// for handle in &handles { + /// handle.abort(); + /// } + /// + /// for handle in handles { + /// assert!(handle.await.unwrap_err().is_cancelled()); + /// } + /// } + /// ``` + pub fn abort(&self) { + if let Some(raw) = self.raw { + raw.shutdown(); + } + } } impl<T> Unpin for JoinHandle<T> {} diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index 17b5157..7b49e95 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -21,7 +21,7 @@ use self::state::State; mod waker; -cfg_rt_threaded! { +cfg_rt_multi_thread! { mod stack; pub(crate) use self::stack::TransferStack; } @@ -79,25 +79,27 @@ pub(crate) trait Schedule: Sync + Sized + 'static { } } -/// Create a new task with an associated join handle -pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>) -where - T: Future + Send + 'static, - S: Schedule, -{ - let raw = RawTask::new::<_, S>(task); +cfg_rt! { + /// Create a new task with an associated join handle + pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>) + where + T: Future + Send + 'static, + S: Schedule, + { + let raw = RawTask::new::<_, S>(task); - let task = Task { - raw, - _p: PhantomData, - }; + let task = Task { + raw, + _p: PhantomData, + }; - let join = JoinHandle::new(raw); + let join = JoinHandle::new(raw); - (Notified(task), join) + (Notified(task), join) + } } -cfg_rt_util! { +cfg_rt! { /// Create a new `!Send` task with an associated join handle pub(crate) unsafe fn joinable_local<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>) where @@ -130,7 +132,7 @@ impl<S: 'static> Task<S> { } } -cfg_rt_threaded! { +cfg_rt_multi_thread! { impl<S: 'static> Notified<S> { pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Notified<S> { Notified(Task::from_raw(ptr)) diff --git a/src/runtime/tests/loom_blocking.rs b/src/runtime/tests/loom_blocking.rs index db7048e..8fb54c5 100644 --- a/src/runtime/tests/loom_blocking.rs +++ b/src/runtime/tests/loom_blocking.rs @@ -8,14 +8,15 @@ fn blocking_shutdown() { let v = Arc::new(()); let rt = mk_runtime(1); - rt.enter(|| { + { + let _enter = rt.enter(); for _ in 0..2 { let v = v.clone(); crate::task::spawn_blocking(move || { assert!(1 < Arc::strong_count(&v)); }); } - }); + } drop(rt); assert_eq!(1, Arc::strong_count(&v)); @@ -23,9 +24,8 @@ fn blocking_shutdown() { } fn mk_runtime(num_threads: usize) -> Runtime { - runtime::Builder::new() - .threaded_scheduler() - .core_threads(num_threads) + runtime::Builder::new_multi_thread() + .worker_threads(num_threads) .build() .unwrap() } diff --git a/src/runtime/tests/loom_pool.rs b/src/runtime/tests/loom_pool.rs index c08658c..06ad641 100644 --- a/src/runtime/tests/loom_pool.rs +++ b/src/runtime/tests/loom_pool.rs @@ -178,7 +178,7 @@ mod group_b { #[test] fn join_output() { loom::model(|| { - let mut rt = mk_pool(1); + let rt = mk_pool(1); rt.block_on(async { let t = crate::spawn(track(async { "hello" })); @@ -192,7 +192,7 @@ mod group_b { #[test] fn poll_drop_handle_then_drop() { loom::model(|| { - let mut rt = mk_pool(1); + let rt = mk_pool(1); rt.block_on(async move { let mut t = crate::spawn(track(async { "hello" })); @@ -209,7 +209,7 @@ mod group_b { #[test] fn complete_block_on_under_load() { loom::model(|| { - let mut pool = mk_pool(1); + let pool = mk_pool(1); pool.block_on(async { // Trigger a re-schedule @@ -296,9 +296,8 @@ mod group_d { } fn mk_pool(num_threads: usize) -> Runtime { - runtime::Builder::new() - .threaded_scheduler() - .core_threads(num_threads) + runtime::Builder::new_multi_thread() + .worker_threads(num_threads) .build() .unwrap() } diff --git a/src/runtime/tests/task.rs b/src/runtime/tests/task.rs index 82315a0..a34526f 100644 --- a/src/runtime/tests/task.rs +++ b/src/runtime/tests/task.rs @@ -1,5 +1,5 @@ use crate::runtime::task::{self, Schedule, Task}; -use crate::util::linked_list::LinkedList; +use crate::util::linked_list::{Link, LinkedList}; use crate::util::TryLock; use std::collections::VecDeque; @@ -72,7 +72,7 @@ struct Inner { struct Core { queue: VecDeque<task::Notified<Runtime>>, - tasks: LinkedList<Task<Runtime>>, + tasks: LinkedList<Task<Runtime>, <Task<Runtime> as Link>::Target>, } static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None); diff --git a/src/runtime/thread_pool/atomic_cell.rs b/src/runtime/thread_pool/atomic_cell.rs index 2bda0fc..98847e6 100644 --- a/src/runtime/thread_pool/atomic_cell.rs +++ b/src/runtime/thread_pool/atomic_cell.rs @@ -22,7 +22,6 @@ impl<T> AtomicCell<T> { from_raw(old) } - #[cfg(feature = "blocking")] pub(super) fn set(&self, val: Box<T>) { let _ = self.swap(Some(val)); } diff --git a/src/runtime/thread_pool/idle.rs b/src/runtime/thread_pool/idle.rs index ae87ca4..6e692fd 100644 --- a/src/runtime/thread_pool/idle.rs +++ b/src/runtime/thread_pool/idle.rs @@ -55,7 +55,7 @@ impl Idle { } // Acquire the lock - let mut sleepers = self.sleepers.lock().unwrap(); + let mut sleepers = self.sleepers.lock(); // Check again, now that the lock is acquired if !self.notify_should_wakeup() { @@ -77,7 +77,7 @@ impl Idle { /// work. pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool { // Acquire the lock - let mut sleepers = self.sleepers.lock().unwrap(); + let mut sleepers = self.sleepers.lock(); // Decrement the number of unparked threads let ret = State::dec_num_unparked(&self.state, is_searching); @@ -112,7 +112,7 @@ impl Idle { /// Unpark a specific worker. This happens if tasks are submitted from /// within the worker's park routine. pub(super) fn unpark_worker_by_id(&self, worker_id: usize) { - let mut sleepers = self.sleepers.lock().unwrap(); + let mut sleepers = self.sleepers.lock(); for index in 0..sleepers.len() { if sleepers[index] == worker_id { @@ -128,7 +128,7 @@ impl Idle { /// Returns `true` if `worker_id` is contained in the sleep set pub(super) fn is_parked(&self, worker_id: usize) -> bool { - let sleepers = self.sleepers.lock().unwrap(); + let sleepers = self.sleepers.lock(); sleepers.contains(&worker_id) } diff --git a/src/runtime/thread_pool/mod.rs b/src/runtime/thread_pool/mod.rs index ced9712..e39695a 100644 --- a/src/runtime/thread_pool/mod.rs +++ b/src/runtime/thread_pool/mod.rs @@ -9,9 +9,7 @@ use self::idle::Idle; mod worker; pub(crate) use worker::Launch; -cfg_blocking! { - pub(crate) use worker::block_in_place; -} +pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; use crate::runtime::task::{self, JoinHandle}; @@ -91,7 +89,7 @@ impl fmt::Debug for ThreadPool { impl Drop for ThreadPool { fn drop(&mut self) { - self.spawner.shared.close(); + self.spawner.shutdown(); } } @@ -108,6 +106,10 @@ impl Spawner { self.shared.schedule(task, false); handle } + + pub(crate) fn shutdown(&mut self) { + self.shared.close(); + } } impl fmt::Debug for Spawner { diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs index abe20da..bc544c9 100644 --- a/src/runtime/thread_pool/worker.rs +++ b/src/runtime/thread_pool/worker.rs @@ -9,10 +9,11 @@ use crate::loom::rand::seed; use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; use crate::runtime; +use crate::runtime::enter::EnterContext; use crate::runtime::park::{Parker, Unparker}; use crate::runtime::thread_pool::{AtomicCell, Idle}; use crate::runtime::{queue, task}; -use crate::util::linked_list::LinkedList; +use crate::util::linked_list::{Link, LinkedList}; use crate::util::FastRand; use std::cell::RefCell; @@ -53,7 +54,7 @@ struct Core { is_shutdown: bool, /// Tasks owned by the core - tasks: LinkedList<Task>, + tasks: LinkedList<Task, <Task as Link>::Target>, /// Parker /// @@ -172,104 +173,96 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) { (shared, launch) } -cfg_blocking! { - use crate::runtime::enter::EnterContext; - - pub(crate) fn block_in_place<F, R>(f: F) -> R - where - F: FnOnce() -> R, - { - // Try to steal the worker core back - struct Reset(coop::Budget); - - impl Drop for Reset { - fn drop(&mut self) { - CURRENT.with(|maybe_cx| { - if let Some(cx) = maybe_cx { - let core = cx.worker.core.take(); - let mut cx_core = cx.core.borrow_mut(); - assert!(cx_core.is_none()); - *cx_core = core; - - // Reset the task budget as we are re-entering the - // runtime. - coop::set(self.0); - } - }); - } +pub(crate) fn block_in_place<F, R>(f: F) -> R +where + F: FnOnce() -> R, +{ + // Try to steal the worker core back + struct Reset(coop::Budget); + + impl Drop for Reset { + fn drop(&mut self) { + CURRENT.with(|maybe_cx| { + if let Some(cx) = maybe_cx { + let core = cx.worker.core.take(); + let mut cx_core = cx.core.borrow_mut(); + assert!(cx_core.is_none()); + *cx_core = core; + + // Reset the task budget as we are re-entering the + // runtime. + coop::set(self.0); + } + }); } + } - let mut had_core = false; - let mut had_entered = false; + let mut had_entered = false; - CURRENT.with(|maybe_cx| { - match (crate::runtime::enter::context(), maybe_cx.is_some()) { - (EnterContext::Entered { .. }, true) => { - // We are on a thread pool runtime thread, so we just need to set up blocking. + CURRENT.with(|maybe_cx| { + match (crate::runtime::enter::context(), maybe_cx.is_some()) { + (EnterContext::Entered { .. }, true) => { + // We are on a thread pool runtime thread, so we just need to set up blocking. + had_entered = true; + } + (EnterContext::Entered { allow_blocking }, false) => { + // We are on an executor, but _not_ on the thread pool. + // That is _only_ okay if we are in a thread pool runtime's block_on method: + if allow_blocking { had_entered = true; - } - (EnterContext::Entered { allow_blocking }, false) => { - // We are on an executor, but _not_ on the thread pool. - // That is _only_ okay if we are in a thread pool runtime's block_on method: - if allow_blocking { - had_entered = true; - return; - } else { - // This probably means we are on the basic_scheduler or in a LocalSet, - // where it is _not_ okay to block. - panic!("can call blocking only when running on the multi-threaded runtime"); - } - } - (EnterContext::NotEntered, true) => { - // This is a nested call to block_in_place (we already exited). - // All the necessary setup has already been done. - return; - } - (EnterContext::NotEntered, false) => { - // We are outside of the tokio runtime, so blocking is fine. - // We can also skip all of the thread pool blocking setup steps. return; + } else { + // This probably means we are on the basic_scheduler or in a LocalSet, + // where it is _not_ okay to block. + panic!("can call blocking only when running on the multi-threaded runtime"); } } + (EnterContext::NotEntered, true) => { + // This is a nested call to block_in_place (we already exited). + // All the necessary setup has already been done. + return; + } + (EnterContext::NotEntered, false) => { + // We are outside of the tokio runtime, so blocking is fine. + // We can also skip all of the thread pool blocking setup steps. + return; + } + } - let cx = maybe_cx.expect("no .is_some() == false cases above should lead here"); - - // Get the worker core. If none is set, then blocking is fine! - let core = match cx.core.borrow_mut().take() { - Some(core) => core, - None => return, - }; - - // The parker should be set here - assert!(core.park.is_some()); + let cx = maybe_cx.expect("no .is_some() == false cases above should lead here"); - // In order to block, the core must be sent to another thread for - // execution. - // - // First, move the core back into the worker's shared core slot. - cx.worker.core.set(core); - had_core = true; + // Get the worker core. If none is set, then blocking is fine! + let core = match cx.core.borrow_mut().take() { + Some(core) => core, + None => return, + }; - // Next, clone the worker handle and send it to a new thread for - // processing. - // - // Once the blocking task is done executing, we will attempt to - // steal the core back. - let worker = cx.worker.clone(); - runtime::spawn_blocking(move || run(worker)); - }); + // The parker should be set here + assert!(core.park.is_some()); + + // In order to block, the core must be sent to another thread for + // execution. + // + // First, move the core back into the worker's shared core slot. + cx.worker.core.set(core); + + // Next, clone the worker handle and send it to a new thread for + // processing. + // + // Once the blocking task is done executing, we will attempt to + // steal the core back. + let worker = cx.worker.clone(); + runtime::spawn_blocking(move || run(worker)); + }); - if had_core { - // Unset the current task's budget. Blocking sections are not - // constrained by task budgets. - let _reset = Reset(coop::stop()); + if had_entered { + // Unset the current task's budget. Blocking sections are not + // constrained by task budgets. + let _reset = Reset(coop::stop()); - crate::runtime::enter::exit(f) - } else if had_entered { - crate::runtime::enter::exit(f) - } else { - f() - } + crate::runtime::enter::exit(f) + } else { + f() } } @@ -576,6 +569,8 @@ impl Core { // Drain the queue while self.next_local_task().is_some() {} + + park.shutdown(); } fn drain_pending_drop(&mut self, worker: &Worker) { @@ -785,7 +780,7 @@ impl Shared { /// /// If all workers have reached this point, the final cleanup is performed. fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) { - let mut workers = self.shutdown_workers.lock().unwrap(); + let mut workers = self.shutdown_workers.lock(); workers.push((core, worker)); if workers.len() != self.remotes.len() { diff --git a/src/runtime/time.rs b/src/runtime/time.rs deleted file mode 100644 index c623d96..0000000 --- a/src/runtime/time.rs +++ /dev/null @@ -1,59 +0,0 @@ -//! Abstracts out the APIs necessary to `Runtime` for integrating the time -//! driver. When the `time` feature flag is **not** enabled. These APIs are -//! shells. This isolates the complexity of dealing with conditional -//! compilation. - -pub(crate) use variant::*; - -#[cfg(feature = "time")] -mod variant { - use crate::park::Either; - use crate::runtime::io; - use crate::time::{self, driver}; - - pub(crate) type Clock = time::Clock; - pub(crate) type Driver = Either<driver::Driver<io::Driver>, io::Driver>; - pub(crate) type Handle = Option<driver::Handle>; - - pub(crate) fn create_clock() -> Clock { - Clock::new() - } - - /// Create a new timer driver / handle pair - pub(crate) fn create_driver( - enable: bool, - io_driver: io::Driver, - clock: Clock, - ) -> (Driver, Handle) { - if enable { - let driver = driver::Driver::new(io_driver, clock); - let handle = driver.handle(); - - (Either::A(driver), Some(handle)) - } else { - (Either::B(io_driver), None) - } - } -} - -#[cfg(not(feature = "time"))] -mod variant { - use crate::runtime::io; - - pub(crate) type Clock = (); - pub(crate) type Driver = io::Driver; - pub(crate) type Handle = (); - - pub(crate) fn create_clock() -> Clock { - () - } - - /// Create a new timer driver / handle pair - pub(crate) fn create_driver( - _enable: bool, - io_driver: io::Driver, - _clock: Clock, - ) -> (Driver, Handle) { - (io_driver, ()) - } -} |