diff options
Diffstat (limited to 'src/runtime/scheduler/mod.rs')
-rw-r--r-- | src/runtime/scheduler/mod.rs | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/src/runtime/scheduler/mod.rs b/src/runtime/scheduler/mod.rs new file mode 100644 index 0000000..f45d8a8 --- /dev/null +++ b/src/runtime/scheduler/mod.rs @@ -0,0 +1,194 @@ +cfg_rt! { + pub(crate) mod current_thread; + pub(crate) use current_thread::CurrentThread; +} + +cfg_rt_multi_thread! { + pub(crate) mod multi_thread; + pub(crate) use multi_thread::MultiThread; +} + +use crate::runtime::driver; + +#[derive(Debug, Clone)] +pub(crate) enum Handle { + #[cfg(feature = "rt")] + CurrentThread(Arc<current_thread::Handle>), + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + MultiThread(Arc<multi_thread::Handle>), + + // TODO: This is to avoid triggering "dead code" warnings many other places + // in the codebase. Remove this during a later cleanup + #[cfg(not(feature = "rt"))] + #[allow(dead_code)] + Disabled, +} + +impl Handle { + #[cfg_attr(not(feature = "full"), allow(dead_code))] + pub(crate) fn driver(&self) -> &driver::Handle { + match *self { + #[cfg(feature = "rt")] + Handle::CurrentThread(ref h) => &h.driver, + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(ref h) => &h.driver, + + #[cfg(not(feature = "rt"))] + Handle::Disabled => unreachable!(), + } + } +} + +cfg_rt! { + use crate::future::Future; + use crate::loom::sync::Arc; + use crate::runtime::{blocking, task::Id}; + use crate::runtime::context; + use crate::task::JoinHandle; + use crate::util::RngSeedGenerator; + + impl Handle { + #[track_caller] + pub(crate) fn current() -> Handle { + match context::try_current() { + Ok(handle) => handle, + Err(e) => panic!("{}", e), + } + } + + pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner { + match self { + Handle::CurrentThread(h) => &h.blocking_spawner, + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(h) => &h.blocking_spawner, + } + } + + pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match self { + Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id), + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id), + } + } + + pub(crate) fn shutdown(&self) { + match *self { + Handle::CurrentThread(_) => {}, + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(ref h) => h.shutdown(), + } + } + + pub(crate) fn seed_generator(&self) -> &RngSeedGenerator { + match self { + Handle::CurrentThread(h) => &h.seed_generator, + + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(h) => &h.seed_generator, + } + } + + pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> { + match self { + Handle::CurrentThread(handle) => handle, + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + _ => panic!("not a CurrentThread handle"), + } + } + } + + cfg_metrics! { + use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + + impl Handle { + pub(crate) fn num_workers(&self) -> usize { + match self { + Handle::CurrentThread(_) => 1, + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.num_workers(), + } + } + + pub(crate) fn num_blocking_threads(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.num_blocking_threads(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.num_blocking_threads(), + } + } + + pub(crate) fn num_idle_blocking_threads(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.num_idle_blocking_threads(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.num_idle_blocking_threads(), + } + } + + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { + match self { + Handle::CurrentThread(handle) => handle.scheduler_metrics(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.scheduler_metrics(), + } + } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + match self { + Handle::CurrentThread(handle) => handle.worker_metrics(worker), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.worker_metrics(worker), + } + } + + pub(crate) fn injection_queue_depth(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.injection_queue_depth(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.injection_queue_depth(), + } + } + + pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { + match self { + Handle::CurrentThread(handle) => handle.worker_metrics(worker).queue_depth(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.worker_local_queue_depth(worker), + } + } + + pub(crate) fn blocking_queue_depth(&self) -> usize { + match self { + Handle::CurrentThread(handle) => handle.blocking_queue_depth(), + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Handle::MultiThread(handle) => handle.blocking_queue_depth(), + } + } + } + } +} + +cfg_not_rt! { + #[cfg(any( + feature = "net", + all(unix, feature = "process"), + all(unix, feature = "signal"), + feature = "time", + ))] + impl Handle { + #[track_caller] + pub(crate) fn current() -> Handle { + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) + } + } +} |