aboutsummaryrefslogtreecommitdiff
path: root/src/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime')
-rw-r--r--src/runtime/basic_scheduler.rs190
-rw-r--r--src/runtime/blocking/mod.rs21
-rw-r--r--src/runtime/blocking/pool.rs72
-rw-r--r--src/runtime/blocking/shutdown.rs11
-rw-r--r--src/runtime/builder.rs350
-rw-r--r--src/runtime/context.rs50
-rw-r--r--src/runtime/driver.rs205
-rw-r--r--src/runtime/enter.rs210
-rw-r--r--src/runtime/handle.rs385
-rw-r--r--src/runtime/io.rs63
-rw-r--r--src/runtime/mod.rs733
-rw-r--r--src/runtime/park.rs26
-rw-r--r--src/runtime/queue.rs10
-rw-r--r--src/runtime/shell.rs106
-rw-r--r--src/runtime/spawner.rs27
-rw-r--r--src/runtime/task/core.rs2
-rw-r--r--src/runtime/task/error.rs28
-rw-r--r--src/runtime/task/harness.rs8
-rw-r--r--src/runtime/task/join.rs110
-rw-r--r--src/runtime/task/mod.rs34
-rw-r--r--src/runtime/tests/loom_blocking.rs10
-rw-r--r--src/runtime/tests/loom_pool.rs11
-rw-r--r--src/runtime/tests/task.rs4
-rw-r--r--src/runtime/thread_pool/atomic_cell.rs1
-rw-r--r--src/runtime/thread_pool/idle.rs8
-rw-r--r--src/runtime/thread_pool/mod.rs10
-rw-r--r--src/runtime/thread_pool/worker.rs173
-rw-r--r--src/runtime/time.rs59
28 files changed, 1491 insertions, 1426 deletions
diff --git a/src/runtime/basic_scheduler.rs b/src/runtime/basic_scheduler.rs
index 7e1c257..5ca8467 100644
--- a/src/runtime/basic_scheduler.rs
+++ b/src/runtime/basic_scheduler.rs
@@ -1,22 +1,35 @@
+use crate::future::poll_fn;
+use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
-use crate::runtime;
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
-use crate::util::linked_list::LinkedList;
-use crate::util::{waker_ref, Wake};
+use crate::sync::notify::Notify;
+use crate::util::linked_list::{Link, LinkedList};
+use crate::util::{waker_ref, Wake, WakerRef};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
-use std::sync::{Arc, Mutex};
-use std::task::Poll::Ready;
+use std::sync::Arc;
+use std::task::Poll::{Pending, Ready};
use std::time::Duration;
/// Executes tasks on the current thread
-pub(crate) struct BasicScheduler<P>
-where
- P: Park,
-{
+pub(crate) struct BasicScheduler<P: Park> {
+ /// Inner state guarded by a mutex that is shared
+ /// between all `block_on` calls.
+ inner: Mutex<Option<Inner<P>>>,
+
+ /// Notifier for waking up other threads to steal the
+ /// parker.
+ notify: Notify,
+
+ /// Sendable task spawner
+ spawner: Spawner,
+}
+
+/// The inner scheduler that owns the task queue and the main parker P.
+struct Inner<P: Park> {
/// Scheduler run queue
///
/// When the scheduler is executed, the queue is removed from `self` and
@@ -42,7 +55,7 @@ pub(crate) struct Spawner {
struct Tasks {
/// Collection of all active tasks spawned onto this executor.
- owned: LinkedList<Task<Arc<Shared>>>,
+ owned: LinkedList<Task<Arc<Shared>>, <Task<Arc<Shared>> as Link>::Target>,
/// Local run queue.
///
@@ -59,7 +72,7 @@ struct Shared {
unpark: Box<dyn Unpark>,
}
-/// Thread-local context
+/// Thread-local context.
struct Context {
/// Shared scheduler state
shared: Arc<Shared>,
@@ -68,38 +81,43 @@ struct Context {
tasks: RefCell<Tasks>,
}
-/// Initial queue capacity
+/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;
/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;
-/// How often ot check the remote queue first
+/// How often to check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;
-// Tracks the current BasicScheduler
+// Tracks the current BasicScheduler.
scoped_thread_local!(static CURRENT: Context);
-impl<P> BasicScheduler<P>
-where
- P: Park,
-{
+impl<P: Park> BasicScheduler<P> {
pub(crate) fn new(park: P) -> BasicScheduler<P> {
let unpark = Box::new(park.unpark());
- BasicScheduler {
+ let spawner = Spawner {
+ shared: Arc::new(Shared {
+ queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
+ unpark: unpark as Box<dyn Unpark>,
+ }),
+ };
+
+ let inner = Mutex::new(Some(Inner {
tasks: Some(Tasks {
owned: LinkedList::new(),
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
- spawner: Spawner {
- shared: Arc::new(Shared {
- queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
- unpark: unpark as Box<dyn Unpark>,
- }),
- },
+ spawner: spawner.clone(),
tick: 0,
park,
+ }));
+
+ BasicScheduler {
+ inner,
+ notify: Notify::new(),
+ spawner,
}
}
@@ -116,13 +134,57 @@ where
self.spawner.spawn(future)
}
- pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output
- where
- F: Future,
- {
+ pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
+ pin!(future);
+
+ // Attempt to steal the dedicated parker and block_on the future if we can there,
+ // othwerwise, lets select on a notification that the parker is available
+ // or the future is complete.
+ loop {
+ if let Some(inner) = &mut self.take_inner() {
+ return inner.block_on(future);
+ } else {
+ let mut enter = crate::runtime::enter(false);
+
+ let notified = self.notify.notified();
+ pin!(notified);
+
+ if let Some(out) = enter
+ .block_on(poll_fn(|cx| {
+ if notified.as_mut().poll(cx).is_ready() {
+ return Ready(None);
+ }
+
+ if let Ready(out) = future.as_mut().poll(cx) {
+ return Ready(Some(out));
+ }
+
+ Pending
+ }))
+ .expect("Failed to `Enter::block_on`")
+ {
+ return out;
+ }
+ }
+ }
+ }
+
+ fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
+ let inner = self.inner.lock().take()?;
+
+ Some(InnerGuard {
+ inner: Some(inner),
+ basic_scheduler: &self,
+ })
+ }
+}
+
+impl<P: Park> Inner<P> {
+ /// Block on the future provided and drive the runtime's driver.
+ fn block_on<F: Future>(&mut self, future: F) -> F::Output {
enter(self, |scheduler, context| {
- let _enter = runtime::enter(false);
- let waker = waker_ref(&scheduler.spawner.shared);
+ let _enter = crate::runtime::enter(false);
+ let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
pin!(future);
@@ -177,16 +239,16 @@ where
/// Enter the scheduler context. This sets the queue and other necessary
/// scheduler state in the thread-local
-fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R
+fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
where
- F: FnOnce(&mut BasicScheduler<P>, &Context) -> R,
+ F: FnOnce(&mut Inner<P>, &Context) -> R,
P: Park,
{
// Ensures the run queue is placed back in the `BasicScheduler` instance
// once `block_on` returns.`
struct Guard<'a, P: Park> {
context: Option<Context>,
- scheduler: &'a mut BasicScheduler<P>,
+ scheduler: &'a mut Inner<P>,
}
impl<P: Park> Drop for Guard<'_, P> {
@@ -213,12 +275,18 @@ where
CURRENT.set(context, || f(scheduler, context))
}
-impl<P> Drop for BasicScheduler<P>
-where
- P: Park,
-{
+impl<P: Park> Drop for BasicScheduler<P> {
fn drop(&mut self) {
- enter(self, |scheduler, context| {
+ // Avoid a double panic if we are currently panicking and
+ // the lock may be poisoned.
+
+ let mut inner = match self.inner.lock().take() {
+ Some(inner) => inner,
+ None if std::thread::panicking() => return,
+ None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
+ };
+
+ enter(&mut inner, |scheduler, context| {
// Loop required here to ensure borrow is dropped between iterations
#[allow(clippy::while_let_loop)]
loop {
@@ -236,7 +304,7 @@ where
}
// Drain remote queue
- for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) {
+ for task in scheduler.spawner.shared.queue.lock().drain(..) {
task.shutdown();
}
@@ -266,7 +334,11 @@ impl Spawner {
}
fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
- self.shared.queue.lock().unwrap().pop_front()
+ self.shared.queue.lock().pop_front()
+ }
+
+ fn waker_ref(&self) -> WakerRef<'_> {
+ waker_ref(&self.shared)
}
}
@@ -307,7 +379,7 @@ impl Schedule for Arc<Shared> {
cx.tasks.borrow_mut().queue.push_back(task);
}
_ => {
- self.queue.lock().unwrap().push_back(task);
+ self.queue.lock().push_back(task);
self.unpark.unpark();
}
});
@@ -324,3 +396,37 @@ impl Wake for Shared {
arc_self.unpark.unpark();
}
}
+
+// ===== InnerGuard =====
+
+/// Used to ensure we always place the Inner value
+/// back into its slot in `BasicScheduler`, even if the
+/// future panics.
+struct InnerGuard<'a, P: Park> {
+ inner: Option<Inner<P>>,
+ basic_scheduler: &'a BasicScheduler<P>,
+}
+
+impl<P: Park> InnerGuard<'_, P> {
+ fn block_on<F: Future>(&mut self, future: F) -> F::Output {
+ // The only time inner gets set to `None` is if we have dropped
+ // already so this unwrap is safe.
+ self.inner.as_mut().unwrap().block_on(future)
+ }
+}
+
+impl<P: Park> Drop for InnerGuard<'_, P> {
+ fn drop(&mut self) {
+ if let Some(scheduler) = self.inner.take() {
+ let mut lock = self.basic_scheduler.inner.lock();
+
+ // Replace old scheduler back into the state to allow
+ // other threads to pick it up and drive it.
+ lock.replace(scheduler);
+
+ // Wake up other possible threads that could steal
+ // the dedicated parker P.
+ self.basic_scheduler.notify.notify_one()
+ }
+ }
+}
diff --git a/src/runtime/blocking/mod.rs b/src/runtime/blocking/mod.rs
index 0b36a75..fece3c2 100644
--- a/src/runtime/blocking/mod.rs
+++ b/src/runtime/blocking/mod.rs
@@ -3,22 +3,20 @@
//! shells. This isolates the complexity of dealing with conditional
//! compilation.
-cfg_blocking_impl! {
- mod pool;
- pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner};
+mod pool;
+pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};
- mod schedule;
- mod shutdown;
- pub(crate) mod task;
+mod schedule;
+mod shutdown;
+pub(crate) mod task;
- use crate::runtime::Builder;
-
- pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
- BlockingPool::new(builder, thread_cap)
+use crate::runtime::Builder;
- }
+pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
+ BlockingPool::new(builder, thread_cap)
}
+/*
cfg_not_blocking_impl! {
use crate::runtime::Builder;
use std::time::Duration;
@@ -41,3 +39,4 @@ cfg_not_blocking_impl! {
}
}
}
+*/
diff --git a/src/runtime/blocking/pool.rs b/src/runtime/blocking/pool.rs
index 40d417b..2967a10 100644
--- a/src/runtime/blocking/pool.rs
+++ b/src/runtime/blocking/pool.rs
@@ -5,9 +5,13 @@ use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask;
+use crate::runtime::builder::ThreadNameFn;
+use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
+use slab::Slab;
+
use std::collections::VecDeque;
use std::fmt;
use std::time::Duration;
@@ -30,7 +34,7 @@ struct Inner {
condvar: Condvar,
/// Spawned threads use this name
- thread_name: String,
+ thread_name: ThreadNameFn,
/// Spawned thread stack size
stack_size: Option<usize>,
@@ -41,7 +45,11 @@ struct Inner {
/// Call before a thread stops
before_stop: Option<Callback>,
+ // Maximum number of threads
thread_cap: usize,
+
+ // Customizable wait timeout
+ keep_alive: Duration,
}
struct Shared {
@@ -51,6 +59,7 @@ struct Shared {
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
+ worker_threads: Slab<thread::JoinHandle<()>>,
}
type Task = task::Notified<NoopSchedule>;
@@ -62,11 +71,8 @@ pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
- let rt = Handle::current();
-
- let (task, handle) = task::joinable(BlockingTask::new(func));
- let _ = rt.blocking_spawner.spawn(task, &rt);
- handle
+ let rt = context::current().expect("not currently running on the Tokio runtime.");
+ rt.spawn_blocking(func)
}
#[allow(dead_code)]
@@ -74,7 +80,7 @@ pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
where
F: FnOnce() -> R + Send + 'static,
{
- let rt = Handle::current();
+ let rt = context::current().expect("not currently running on the Tokio runtime.");
let (task, _handle) = task::joinable(BlockingTask::new(func));
rt.blocking_spawner.spawn(task, &rt)
@@ -85,6 +91,7 @@ where
impl BlockingPool {
pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
+ let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE);
BlockingPool {
spawner: Spawner {
@@ -96,6 +103,7 @@ impl BlockingPool {
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
+ worker_threads: Slab::new(),
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
@@ -103,6 +111,7 @@ impl BlockingPool {
after_start: builder.after_start.clone(),
before_stop: builder.before_stop.clone(),
thread_cap,
+ keep_alive,
}),
},
shutdown_rx,
@@ -114,7 +123,7 @@ impl BlockingPool {
}
pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
- let mut shared = self.spawner.inner.shared.lock().unwrap();
+ let mut shared = self.spawner.inner.shared.lock();
// The function can be called multiple times. First, by explicitly
// calling `shutdown` then by the drop handler calling `shutdown`. This
@@ -126,10 +135,15 @@ impl BlockingPool {
shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();
+ let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());
drop(shared);
- self.shutdown_rx.wait(timeout);
+ if self.shutdown_rx.wait(timeout) {
+ for handle in workers.drain() {
+ let _ = handle.join();
+ }
+ }
}
}
@@ -150,7 +164,7 @@ impl fmt::Debug for BlockingPool {
impl Spawner {
pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
let shutdown_tx = {
- let mut shared = self.inner.shared.lock().unwrap();
+ let mut shared = self.inner.shared.lock();
if shared.shutdown {
// Shutdown the task
@@ -187,14 +201,24 @@ impl Spawner {
};
if let Some(shutdown_tx) = shutdown_tx {
- self.spawn_thread(shutdown_tx, rt);
+ let mut shared = self.inner.shared.lock();
+ let entry = shared.worker_threads.vacant_entry();
+
+ let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
+
+ entry.insert(handle);
}
Ok(())
}
- fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
- let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
+ fn spawn_thread(
+ &self,
+ shutdown_tx: shutdown::Sender,
+ rt: &Handle,
+ worker_id: usize,
+ ) -> thread::JoinHandle<()> {
+ let mut builder = thread::Builder::new().name((self.inner.thread_name)());
if let Some(stack_size) = self.inner.stack_size {
builder = builder.stack_size(stack_size);
@@ -205,23 +229,21 @@ impl Spawner {
builder
.spawn(move || {
// Only the reference should be moved into the closure
- let rt = &rt;
- rt.enter(move || {
- rt.blocking_spawner.inner.run();
- drop(shutdown_tx);
- })
+ let _enter = crate::runtime::context::enter(rt.clone());
+ rt.blocking_spawner.inner.run(worker_id);
+ drop(shutdown_tx);
})
- .unwrap();
+ .unwrap()
}
}
impl Inner {
- fn run(&self) {
+ fn run(&self, worker_id: usize) {
if let Some(f) = &self.after_start {
f()
}
- let mut shared = self.shared.lock().unwrap();
+ let mut shared = self.shared.lock();
'main: loop {
// BUSY
@@ -229,14 +251,14 @@ impl Inner {
drop(shared);
task.run();
- shared = self.shared.lock().unwrap();
+ shared = self.shared.lock();
}
// IDLE
shared.num_idle += 1;
while !shared.shutdown {
- let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
+ let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
shared = lock_result.0;
let timeout_result = lock_result.1;
@@ -252,6 +274,8 @@ impl Inner {
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
+ shared.worker_threads.remove(worker_id);
+
break 'main;
}
@@ -264,7 +288,7 @@ impl Inner {
drop(shared);
task.shutdown();
- shared = self.shared.lock().unwrap();
+ shared = self.shared.lock();
}
// Work was produced, and we "took" it (by decrementing num_notify).
diff --git a/src/runtime/blocking/shutdown.rs b/src/runtime/blocking/shutdown.rs
index e76a701..3b6cc59 100644
--- a/src/runtime/blocking/shutdown.rs
+++ b/src/runtime/blocking/shutdown.rs
@@ -32,11 +32,13 @@ impl Receiver {
/// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
/// duration. If `timeout` is `None`, then the thread is blocked until the
/// shutdown signal is received.
- pub(crate) fn wait(&mut self, timeout: Option<Duration>) {
+ ///
+ /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
+ pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
use crate::runtime::enter::try_enter;
if timeout == Some(Duration::from_nanos(0)) {
- return;
+ return true;
}
let mut e = match try_enter(false) {
@@ -44,7 +46,7 @@ impl Receiver {
_ => {
if std::thread::panicking() {
// Don't panic in a panic
- return;
+ return false;
} else {
panic!(
"Cannot drop a runtime in a context where blocking is not allowed. \
@@ -60,9 +62,10 @@ impl Receiver {
// current thread (usually, shutting down a runtime stored in a
// thread-local).
if let Some(timeout) = timeout {
- let _ = e.block_on_timeout(&mut self.rx, timeout);
+ e.block_on_timeout(&mut self.rx, timeout).is_ok()
} else {
let _ = e.block_on(&mut self.rx);
+ true
}
}
}
diff --git a/src/runtime/builder.rs b/src/runtime/builder.rs
index fad72c7..e792c7d 100644
--- a/src/runtime/builder.rs
+++ b/src/runtime/builder.rs
@@ -1,23 +1,24 @@
use crate::runtime::handle::Handle;
-use crate::runtime::shell::Shell;
-use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
+use crate::runtime::{blocking, driver, Callback, Runtime, Spawner};
use std::fmt;
-#[cfg(not(loom))]
-use std::sync::Arc;
+use std::io;
+use std::time::Duration;
/// Builds Tokio Runtime with custom configuration values.
///
/// Methods can be chained in order to set the configuration values. The
/// Runtime is constructed by calling [`build`].
///
-/// New instances of `Builder` are obtained via [`Builder::new`].
+/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
+/// or [`Builder::new_current_thread`].
///
/// See function level documentation for details on the various configuration
/// settings.
///
/// [`build`]: method@Self::build
-/// [`Builder::new`]: method@Self::new
+/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
+/// [`Builder::new_current_thread`]: method@Self::new_current_thread
///
/// # Examples
///
@@ -26,9 +27,8 @@ use std::sync::Arc;
///
/// fn main() {
/// // build runtime
-/// let runtime = Builder::new()
-/// .threaded_scheduler()
-/// .core_threads(4)
+/// let runtime = Builder::new_multi_thread()
+/// .worker_threads(4)
/// .thread_name("my-custom-name")
/// .thread_stack_size(3 * 1024 * 1024)
/// .build()
@@ -38,7 +38,7 @@ use std::sync::Arc;
/// }
/// ```
pub struct Builder {
- /// The task execution model to use.
+ /// Runtime type
kind: Kind,
/// Whether or not to enable the I/O driver
@@ -50,13 +50,13 @@ pub struct Builder {
/// The number of worker threads, used by Runtime.
///
/// Only used when not using the current-thread executor.
- core_threads: Option<usize>,
+ worker_threads: Option<usize>,
/// Cap on thread usage.
max_threads: usize,
- /// Name used for threads spawned by the runtime.
- pub(super) thread_name: String,
+ /// Name fn used for threads spawned by the runtime.
+ pub(super) thread_name: ThreadNameFn,
/// Stack size used for threads spawned by the runtime.
pub(super) thread_stack_size: Option<usize>,
@@ -66,26 +66,43 @@ pub struct Builder {
/// To run before each worker thread stops
pub(super) before_stop: Option<Callback>,
+
+ /// Customizable keep alive timeout for BlockingPool
+ pub(super) keep_alive: Option<Duration>,
}
-#[derive(Debug, Clone, Copy)]
-enum Kind {
- Shell,
- #[cfg(feature = "rt-core")]
- Basic,
- #[cfg(feature = "rt-threaded")]
- ThreadPool,
+pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
+
+pub(crate) enum Kind {
+ CurrentThread,
+ #[cfg(feature = "rt-multi-thread")]
+ MultiThread,
}
impl Builder {
+ /// Returns a new builder with the current thread scheduler selected.
+ ///
+ /// Configuration methods can be chained on the return value.
+ pub fn new_current_thread() -> Builder {
+ Builder::new(Kind::CurrentThread)
+ }
+
+ /// Returns a new builder with the multi thread scheduler selected.
+ ///
+ /// Configuration methods can be chained on the return value.
+ #[cfg(feature = "rt-multi-thread")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
+ pub fn new_multi_thread() -> Builder {
+ Builder::new(Kind::MultiThread)
+ }
+
/// Returns a new runtime builder initialized with default configuration
/// values.
///
/// Configuration methods can be chained on the return value.
- pub fn new() -> Builder {
+ pub(crate) fn new(kind: Kind) -> Builder {
Builder {
- // No task execution by default
- kind: Kind::Shell,
+ kind,
// I/O defaults to "off"
enable_io: false,
@@ -94,12 +111,12 @@ impl Builder {
enable_time: false,
// Default to lazy auto-detection (one thread per CPU core)
- core_threads: None,
+ worker_threads: None,
max_threads: 512,
// Default thread name
- thread_name: "tokio-runtime-worker".into(),
+ thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),
// Do not set a stack size by default
thread_stack_size: None,
@@ -107,6 +124,8 @@ impl Builder {
// No worker thread callbacks
after_start: None,
before_stop: None,
+
+ keep_alive: None,
}
}
@@ -121,14 +140,13 @@ impl Builder {
/// ```
/// use tokio::runtime;
///
- /// let rt = runtime::Builder::new()
- /// .threaded_scheduler()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .enable_all()
/// .build()
/// .unwrap();
/// ```
pub fn enable_all(&mut self) -> &mut Self {
- #[cfg(feature = "io-driver")]
+ #[cfg(any(feature = "net", feature = "process", all(unix, feature = "signal")))]
self.enable_io();
#[cfg(feature = "time")]
self.enable_time();
@@ -136,51 +154,68 @@ impl Builder {
self
}
- #[deprecated(note = "In future will be replaced by core_threads method")]
- /// Sets the maximum number of worker threads for the `Runtime`'s thread pool.
+ /// Sets the number of worker threads the `Runtime` will use.
+ ///
+ /// This should be a number between 0 and 32,768 though it is advised to
+ /// keep this value on the smaller side.
///
- /// This must be a number between 1 and 32,768 though it is advised to keep
- /// this value on the smaller side.
+ /// # Default
///
/// The default value is the number of cores available to the system.
- pub fn num_threads(&mut self, val: usize) -> &mut Self {
- self.core_threads = Some(val);
- self
- }
-
- /// Sets the core number of worker threads for the `Runtime`'s thread pool.
///
- /// This should be a number between 1 and 32,768 though it is advised to keep
- /// this value on the smaller side.
+ /// # Panic
///
- /// The default value is the number of cores available to the system.
+ /// When using the `current_thread` runtime this method will panic, since
+ /// those variants do not allow setting worker thread counts.
///
- /// These threads will be always active and running.
///
/// # Examples
///
+ /// ## Multi threaded runtime with 4 threads
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// // This will spawn a work-stealing runtime with 4 worker threads.
+ /// let rt = runtime::Builder::new_multi_thread()
+ /// .worker_threads(4)
+ /// .build()
+ /// .unwrap();
+ ///
+ /// rt.spawn(async move {});
+ /// ```
+ ///
+ /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
+ ///
/// ```
/// use tokio::runtime;
///
- /// let rt = runtime::Builder::new()
- /// .threaded_scheduler()
- /// .core_threads(4)
+ /// // Create a runtime that _must_ be driven from a call
+ /// // to `Runtime::block_on`.
+ /// let rt = runtime::Builder::new_current_thread()
/// .build()
/// .unwrap();
+ ///
+ /// // This will run the runtime and future on the current thread
+ /// rt.block_on(async move {});
/// ```
- pub fn core_threads(&mut self, val: usize) -> &mut Self {
- assert_ne!(val, 0, "Core threads cannot be zero");
- self.core_threads = Some(val);
+ ///
+ /// # Panic
+ ///
+ /// This will panic if `val` is not larger than `0`.
+ pub fn worker_threads(&mut self, val: usize) -> &mut Self {
+ assert!(val > 0, "Worker threads cannot be set to 0");
+ self.worker_threads = Some(val);
self
}
/// Specifies limit for threads, spawned by the Runtime.
///
/// This is number of threads to be used by Runtime, including `core_threads`
- /// Having `max_threads` less than `core_threads` results in invalid configuration
+ /// Having `max_threads` less than `worker_threads` results in invalid configuration
/// when building multi-threaded `Runtime`, which would cause a panic.
///
- /// Similarly to the `core_threads`, this number should be between 1 and 32,768.
+ /// Similarly to the `worker_threads`, this number should be between 0 and 32,768.
///
/// The default value is 512.
///
@@ -189,7 +224,6 @@ impl Builder {
/// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for
/// blocking annotations) as `max_threads - core_threads`.
pub fn max_threads(&mut self, val: usize) -> &mut Self {
- assert_ne!(val, 0, "Thread limit cannot be zero");
self.max_threads = val;
self
}
@@ -204,13 +238,42 @@ impl Builder {
/// # use tokio::runtime;
///
/// # pub fn main() {
- /// let rt = runtime::Builder::new()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .thread_name("my-pool")
/// .build();
/// # }
/// ```
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
- self.thread_name = val.into();
+ let val = val.into();
+ self.thread_name = std::sync::Arc::new(move || val.clone());
+ self
+ }
+
+ /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
+ ///
+ /// The default name fn is `|| "tokio-runtime-worker".into()`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ /// # use std::sync::atomic::{AtomicUsize, Ordering};
+ ///
+ /// # pub fn main() {
+ /// let rt = runtime::Builder::new_multi_thread()
+ /// .thread_name_fn(|| {
+ /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
+ /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
+ /// format!("my-pool-{}", id)
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() -> String + Send + Sync + 'static,
+ {
+ self.thread_name = std::sync::Arc::new(f);
self
}
@@ -228,8 +291,7 @@ impl Builder {
/// # use tokio::runtime;
///
/// # pub fn main() {
- /// let rt = runtime::Builder::new()
- /// .threaded_scheduler()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .thread_stack_size(32 * 1024)
/// .build();
/// # }
@@ -250,8 +312,7 @@ impl Builder {
/// # use tokio::runtime;
///
/// # pub fn main() {
- /// let runtime = runtime::Builder::new()
- /// .threaded_scheduler()
+ /// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_start(|| {
/// println!("thread started");
/// })
@@ -263,7 +324,7 @@ impl Builder {
where
F: Fn() + Send + Sync + 'static,
{
- self.after_start = Some(Arc::new(f));
+ self.after_start = Some(std::sync::Arc::new(f));
self
}
@@ -277,8 +338,7 @@ impl Builder {
/// # use tokio::runtime;
///
/// # pub fn main() {
- /// let runtime = runtime::Builder::new()
- /// .threaded_scheduler()
+ /// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_stop(|| {
/// println!("thread stopping");
/// })
@@ -290,56 +350,86 @@ impl Builder {
where
F: Fn() + Send + Sync + 'static,
{
- self.before_stop = Some(Arc::new(f));
+ self.before_stop = Some(std::sync::Arc::new(f));
self
}
/// Creates the configured `Runtime`.
///
- /// The returned `ThreadPool` instance is ready to spawn tasks.
+ /// The returned `Runtime` instance is ready to spawn tasks.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Builder;
///
- /// let mut rt = Builder::new().build().unwrap();
+ /// let rt = Builder::new_multi_thread().build().unwrap();
///
/// rt.block_on(async {
/// println!("Hello from the Tokio runtime");
/// });
/// ```
pub fn build(&mut self) -> io::Result<Runtime> {
- match self.kind {
- Kind::Shell => self.build_shell_runtime(),
- #[cfg(feature = "rt-core")]
- Kind::Basic => self.build_basic_runtime(),
- #[cfg(feature = "rt-threaded")]
- Kind::ThreadPool => self.build_threaded_runtime(),
+ match &self.kind {
+ Kind::CurrentThread => self.build_basic_runtime(),
+ #[cfg(feature = "rt-multi-thread")]
+ Kind::MultiThread => self.build_threaded_runtime(),
}
}
- fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
- use crate::runtime::Kind;
+ fn get_cfg(&self) -> driver::Cfg {
+ driver::Cfg {
+ enable_io: self.enable_io,
+ enable_time: self.enable_time,
+ }
+ }
- let clock = time::create_clock();
+ /// Sets a custom timeout for a thread in the blocking pool.
+ ///
+ /// By default, the timeout for a thread is set to 10 seconds. This can
+ /// be overriden using .thread_keep_alive().
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ /// # use std::time::Duration;
+ ///
+ /// # pub fn main() {
+ /// let rt = runtime::Builder::new_multi_thread()
+ /// .thread_keep_alive(Duration::from_millis(100))
+ /// .build();
+ /// # }
+ /// ```
+ pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
+ self.keep_alive = Some(duration);
+ self
+ }
+
+ fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
+ use crate::runtime::{BasicScheduler, Kind};
- // Create I/O driver
- let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
- let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
+ let (driver, resources) = driver::Driver::new(self.get_cfg())?;
- let spawner = Spawner::Shell;
+ // And now put a single-threaded scheduler on top of the timer. When
+ // there are no futures ready to do something, it'll let the timer or
+ // the reactor to generate some new stimuli for the futures to continue
+ // in their life.
+ let scheduler = BasicScheduler::new(driver);
+ let spawner = Spawner::Basic(scheduler.spawner().clone());
+ // Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
- kind: Kind::Shell(Shell::new(driver)),
+ kind: Kind::CurrentThread(scheduler),
handle: Handle {
spawner,
- io_handle,
- time_handle,
- clock,
+ io_handle: resources.io_handle,
+ time_handle: resources.time_handle,
+ signal_handle: resources.signal_handle,
+ clock: resources.clock,
blocking_spawner,
},
blocking_pool,
@@ -359,7 +449,7 @@ cfg_io_driver! {
/// ```
/// use tokio::runtime;
///
- /// let rt = runtime::Builder::new()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .enable_io()
/// .build()
/// .unwrap();
@@ -382,7 +472,7 @@ cfg_time! {
/// ```
/// use tokio::runtime;
///
- /// let rt = runtime::Builder::new()
+ /// let rt = runtime::Builder::new_multi_thread()
/// .enable_time()
/// .build()
/// .unwrap();
@@ -394,85 +484,19 @@ cfg_time! {
}
}
-cfg_rt_core! {
+cfg_rt_multi_thread! {
impl Builder {
- /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread.
- ///
- /// The executor and all necessary drivers will all be run on the current
- /// thread during [`block_on`] calls.
- ///
- /// See also [the module level documentation][1], which has a section on scheduler
- /// types.
- ///
- /// [1]: index.html#runtime-configurations
- /// [`block_on`]: Runtime::block_on
- pub fn basic_scheduler(&mut self) -> &mut Self {
- self.kind = Kind::Basic;
- self
- }
-
- fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
- use crate::runtime::{BasicScheduler, Kind};
-
- let clock = time::create_clock();
-
- // Create I/O driver
- let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
-
- let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
-
- // And now put a single-threaded scheduler on top of the timer. When
- // there are no futures ready to do something, it'll let the timer or
- // the reactor to generate some new stimuli for the futures to continue
- // in their life.
- let scheduler = BasicScheduler::new(driver);
- let spawner = Spawner::Basic(scheduler.spawner().clone());
-
- // Blocking pool
- let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
- let blocking_spawner = blocking_pool.spawner().clone();
-
- Ok(Runtime {
- kind: Kind::Basic(scheduler),
- handle: Handle {
- spawner,
- io_handle,
- time_handle,
- clock,
- blocking_spawner,
- },
- blocking_pool,
- })
- }
- }
-}
-
-cfg_rt_threaded! {
- impl Builder {
- /// Sets runtime to use a multi-threaded scheduler for executing tasks.
- ///
- /// See also [the module level documentation][1], which has a section on scheduler
- /// types.
- ///
- /// [1]: index.html#runtime-configurations
- pub fn threaded_scheduler(&mut self) -> &mut Self {
- self.kind = Kind::ThreadPool;
- self
- }
-
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Kind, ThreadPool};
use crate::runtime::park::Parker;
use std::cmp;
- let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));
+ let core_threads = self.worker_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));
assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");
- let clock = time::create_clock();
+ let (driver, resources) = driver::Driver::new(self.get_cfg())?;
- let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
- let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
@@ -483,14 +507,16 @@ cfg_rt_threaded! {
// Create the runtime handle
let handle = Handle {
spawner,
- io_handle,
- time_handle,
- clock,
+ io_handle: resources.io_handle,
+ time_handle: resources.time_handle,
+ signal_handle: resources.signal_handle,
+ clock: resources.clock,
blocking_spawner,
};
// Spawn the thread pool workers
- handle.enter(|| launch.launch());
+ let _enter = crate::runtime::context::enter(handle.clone());
+ launch.launch();
Ok(Runtime {
kind: Kind::ThreadPool(scheduler),
@@ -501,19 +527,15 @@ cfg_rt_threaded! {
}
}
-impl Default for Builder {
- fn default() -> Self {
- Self::new()
- }
-}
-
impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
- .field("kind", &self.kind)
- .field("core_threads", &self.core_threads)
+ .field("worker_threads", &self.worker_threads)
.field("max_threads", &self.max_threads)
- .field("thread_name", &self.thread_name)
+ .field(
+ "thread_name",
+ &"<dyn Fn() -> String + Send + Sync + 'static>",
+ )
.field("thread_stack_size", &self.thread_stack_size)
.field("after_start", &self.after_start.as_ref().map(|_| "..."))
.field("before_stop", &self.after_start.as_ref().map(|_| "..."))
diff --git a/src/runtime/context.rs b/src/runtime/context.rs
index 1b267f4..0817019 100644
--- a/src/runtime/context.rs
+++ b/src/runtime/context.rs
@@ -12,7 +12,7 @@ pub(crate) fn current() -> Option<Handle> {
}
cfg_io_driver! {
- pub(crate) fn io_handle() -> crate::runtime::io::Handle {
+ pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.io_handle.clone(),
None => Default::default(),
@@ -20,8 +20,18 @@ cfg_io_driver! {
}
}
+cfg_signal_internal! {
+ #[cfg(unix)]
+ pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {
+ CONTEXT.with(|ctx| match *ctx.borrow() {
+ Some(ref ctx) => ctx.signal_handle.clone(),
+ None => Default::default(),
+ })
+ }
+}
+
cfg_time! {
- pub(crate) fn time_handle() -> crate::runtime::time::Handle {
+ pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.time_handle.clone(),
None => Default::default(),
@@ -29,7 +39,7 @@ cfg_time! {
}
cfg_test_util! {
- pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
+ pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.clock.clone()),
None => None,
@@ -38,7 +48,7 @@ cfg_time! {
}
}
-cfg_rt_core! {
+cfg_rt! {
pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.spawner.clone()),
@@ -50,24 +60,20 @@ cfg_rt_core! {
/// Set this [`Handle`] as the current active [`Handle`].
///
/// [`Handle`]: Handle
-pub(crate) fn enter<F, R>(new: Handle, f: F) -> R
-where
- F: FnOnce() -> R,
-{
- struct DropGuard(Option<Handle>);
-
- impl Drop for DropGuard {
- fn drop(&mut self) {
- CONTEXT.with(|ctx| {
- *ctx.borrow_mut() = self.0.take();
- });
- }
- }
-
- let _guard = CONTEXT.with(|ctx| {
+pub(crate) fn enter(new: Handle) -> EnterGuard {
+ CONTEXT.with(|ctx| {
let old = ctx.borrow_mut().replace(new);
- DropGuard(old)
- });
+ EnterGuard(old)
+ })
+}
+
+#[derive(Debug)]
+pub(crate) struct EnterGuard(Option<Handle>);
- f()
+impl Drop for EnterGuard {
+ fn drop(&mut self) {
+ CONTEXT.with(|ctx| {
+ *ctx.borrow_mut() = self.0.take();
+ });
+ }
}
diff --git a/src/runtime/driver.rs b/src/runtime/driver.rs
new file mode 100644
index 0000000..e89de9d
--- /dev/null
+++ b/src/runtime/driver.rs
@@ -0,0 +1,205 @@
+//! Abstracts out the entire chain of runtime sub-drivers into common types.
+use crate::park::thread::ParkThread;
+use crate::park::Park;
+
+use std::io;
+use std::time::Duration;
+
+// ===== io driver =====
+
+cfg_io_driver! {
+ type IoDriver = crate::io::driver::Driver;
+ type IoStack = crate::park::either::Either<ProcessDriver, ParkThread>;
+ pub(crate) type IoHandle = Option<crate::io::driver::Handle>;
+
+ fn create_io_stack(enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
+ use crate::park::either::Either;
+
+ #[cfg(loom)]
+ assert!(!enabled);
+
+ let ret = if enabled {
+ let io_driver = crate::io::driver::Driver::new()?;
+ let io_handle = io_driver.handle();
+
+ let (signal_driver, signal_handle) = create_signal_driver(io_driver)?;
+ let process_driver = create_process_driver(signal_driver)?;
+
+ (Either::A(process_driver), Some(io_handle), signal_handle)
+ } else {
+ (Either::B(ParkThread::new()), Default::default(), Default::default())
+ };
+
+ Ok(ret)
+ }
+}
+
+cfg_not_io_driver! {
+ pub(crate) type IoHandle = ();
+ type IoStack = ParkThread;
+
+ fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
+ Ok((ParkThread::new(), Default::default(), Default::default()))
+ }
+}
+
+// ===== signal driver =====
+
+macro_rules! cfg_signal_internal_and_unix {
+ ($($item:item)*) => {
+ #[cfg(unix)]
+ cfg_signal_internal! { $($item)* }
+ }
+}
+
+cfg_signal_internal_and_unix! {
+ type SignalDriver = crate::signal::unix::driver::Driver;
+ pub(crate) type SignalHandle = Option<crate::signal::unix::driver::Handle>;
+
+ fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
+ let driver = crate::signal::unix::driver::Driver::new(io_driver)?;
+ let handle = driver.handle();
+ Ok((driver, Some(handle)))
+ }
+}
+
+cfg_not_signal_internal! {
+ pub(crate) type SignalHandle = ();
+
+ cfg_io_driver! {
+ type SignalDriver = IoDriver;
+
+ fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
+ Ok((io_driver, ()))
+ }
+ }
+}
+
+// ===== process driver =====
+
+cfg_process_driver! {
+ type ProcessDriver = crate::process::unix::driver::Driver;
+
+ fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> {
+ crate::process::unix::driver::Driver::new(signal_driver)
+ }
+}
+
+cfg_not_process_driver! {
+ cfg_io_driver! {
+ type ProcessDriver = SignalDriver;
+
+ fn create_process_driver(signal_driver: SignalDriver) -> io::Result<ProcessDriver> {
+ Ok(signal_driver)
+ }
+ }
+}
+
+// ===== time driver =====
+
+cfg_time! {
+ type TimeDriver = crate::park::either::Either<crate::time::driver::Driver<IoStack>, IoStack>;
+
+ pub(crate) type Clock = crate::time::Clock;
+ pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;
+
+ fn create_clock() -> Clock {
+ crate::time::Clock::new()
+ }
+
+ fn create_time_driver(
+ enable: bool,
+ io_stack: IoStack,
+ clock: Clock,
+ ) -> (TimeDriver, TimeHandle) {
+ use crate::park::either::Either;
+
+ if enable {
+ let driver = crate::time::driver::Driver::new(io_stack, clock);
+ let handle = driver.handle();
+
+ (Either::A(driver), Some(handle))
+ } else {
+ (Either::B(io_stack), None)
+ }
+ }
+}
+
+cfg_not_time! {
+ type TimeDriver = IoStack;
+
+ pub(crate) type Clock = ();
+ pub(crate) type TimeHandle = ();
+
+ fn create_clock() -> Clock {
+ ()
+ }
+
+ fn create_time_driver(
+ _enable: bool,
+ io_stack: IoStack,
+ _clock: Clock,
+ ) -> (TimeDriver, TimeHandle) {
+ (io_stack, ())
+ }
+}
+
+// ===== runtime driver =====
+
+#[derive(Debug)]
+pub(crate) struct Driver {
+ inner: TimeDriver,
+}
+
+pub(crate) struct Resources {
+ pub(crate) io_handle: IoHandle,
+ pub(crate) signal_handle: SignalHandle,
+ pub(crate) time_handle: TimeHandle,
+ pub(crate) clock: Clock,
+}
+
+pub(crate) struct Cfg {
+ pub(crate) enable_io: bool,
+ pub(crate) enable_time: bool,
+}
+
+impl Driver {
+ pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> {
+ let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
+
+ let clock = create_clock();
+ let (time_driver, time_handle) =
+ create_time_driver(cfg.enable_time, io_stack, clock.clone());
+
+ Ok((
+ Self { inner: time_driver },
+ Resources {
+ io_handle,
+ signal_handle,
+ time_handle,
+ clock,
+ },
+ ))
+ }
+}
+
+impl Park for Driver {
+ type Unpark = <TimeDriver as Park>::Unpark;
+ type Error = <TimeDriver as Park>::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.inner.unpark()
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.inner.park()
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.inner.park_timeout(duration)
+ }
+
+ fn shutdown(&mut self) {
+ self.inner.shutdown()
+ }
+}
diff --git a/src/runtime/enter.rs b/src/runtime/enter.rs
index 56a7c57..4dd8dd0 100644
--- a/src/runtime/enter.rs
+++ b/src/runtime/enter.rs
@@ -4,8 +4,8 @@ use std::marker::PhantomData;
#[derive(Debug, Clone, Copy)]
pub(crate) enum EnterContext {
+ #[cfg_attr(not(feature = "rt"), allow(dead_code))]
Entered {
- #[allow(dead_code)]
allow_blocking: bool,
},
NotEntered,
@@ -13,11 +13,7 @@ pub(crate) enum EnterContext {
impl EnterContext {
pub(crate) fn is_entered(self) -> bool {
- if let EnterContext::Entered { .. } = self {
- true
- } else {
- false
- }
+ matches!(self, EnterContext::Entered { .. })
}
}
@@ -28,32 +24,38 @@ pub(crate) struct Enter {
_p: PhantomData<RefCell<()>>,
}
-/// Marks the current thread as being within the dynamic extent of an
-/// executor.
-pub(crate) fn enter(allow_blocking: bool) -> Enter {
- if let Some(enter) = try_enter(allow_blocking) {
- return enter;
- }
+cfg_rt! {
+ use crate::park::thread::ParkError;
- panic!(
- "Cannot start a runtime from within a runtime. This happens \
- because a function (like `block_on`) attempted to block the \
- current thread while the thread is being used to drive \
- asynchronous tasks."
- );
-}
+ use std::time::Duration;
-/// Tries to enter a runtime context, returns `None` if already in a runtime
-/// context.
-pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> {
- ENTERED.with(|c| {
- if c.get().is_entered() {
- None
- } else {
- c.set(EnterContext::Entered { allow_blocking });
- Some(Enter { _p: PhantomData })
+ /// Marks the current thread as being within the dynamic extent of an
+ /// executor.
+ pub(crate) fn enter(allow_blocking: bool) -> Enter {
+ if let Some(enter) = try_enter(allow_blocking) {
+ return enter;
}
- })
+
+ panic!(
+ "Cannot start a runtime from within a runtime. This happens \
+ because a function (like `block_on`) attempted to block the \
+ current thread while the thread is being used to drive \
+ asynchronous tasks."
+ );
+ }
+
+ /// Tries to enter a runtime context, returns `None` if already in a runtime
+ /// context.
+ pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> {
+ ENTERED.with(|c| {
+ if c.get().is_entered() {
+ None
+ } else {
+ c.set(EnterContext::Entered { allow_blocking });
+ Some(Enter { _p: PhantomData })
+ }
+ })
+ }
}
// Forces the current "entered" state to be cleared while the closure
@@ -63,115 +65,92 @@ pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> {
//
// This is hidden for a reason. Do not use without fully understanding
// executors. Misuing can easily cause your program to deadlock.
-#[cfg(all(feature = "rt-threaded", feature = "blocking"))]
-pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {
- // Reset in case the closure panics
- struct Reset(EnterContext);
- impl Drop for Reset {
- fn drop(&mut self) {
- ENTERED.with(|c| {
- assert!(!c.get().is_entered(), "closure claimed permanent executor");
- c.set(self.0);
- });
+cfg_rt_multi_thread! {
+ pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {
+ // Reset in case the closure panics
+ struct Reset(EnterContext);
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ ENTERED.with(|c| {
+ assert!(!c.get().is_entered(), "closure claimed permanent executor");
+ c.set(self.0);
+ });
+ }
}
- }
- let was = ENTERED.with(|c| {
- let e = c.get();
- assert!(e.is_entered(), "asked to exit when not entered");
- c.set(EnterContext::NotEntered);
- e
- });
+ let was = ENTERED.with(|c| {
+ let e = c.get();
+ assert!(e.is_entered(), "asked to exit when not entered");
+ c.set(EnterContext::NotEntered);
+ e
+ });
- let _reset = Reset(was);
- // dropping _reset after f() will reset ENTERED
- f()
+ let _reset = Reset(was);
+ // dropping _reset after f() will reset ENTERED
+ f()
+ }
}
-cfg_rt_core! {
- cfg_rt_util! {
- /// Disallow blocking in the current runtime context until the guard is dropped.
- pub(crate) fn disallow_blocking() -> DisallowBlockingGuard {
- let reset = ENTERED.with(|c| {
- if let EnterContext::Entered {
- allow_blocking: true,
- } = c.get()
- {
- c.set(EnterContext::Entered {
- allow_blocking: false,
- });
- true
- } else {
- false
- }
- });
- DisallowBlockingGuard(reset)
- }
+cfg_rt! {
+ /// Disallow blocking in the current runtime context until the guard is dropped.
+ pub(crate) fn disallow_blocking() -> DisallowBlockingGuard {
+ let reset = ENTERED.with(|c| {
+ if let EnterContext::Entered {
+ allow_blocking: true,
+ } = c.get()
+ {
+ c.set(EnterContext::Entered {
+ allow_blocking: false,
+ });
+ true
+ } else {
+ false
+ }
+ });
+ DisallowBlockingGuard(reset)
+ }
- pub(crate) struct DisallowBlockingGuard(bool);
- impl Drop for DisallowBlockingGuard {
- fn drop(&mut self) {
- if self.0 {
- // XXX: Do we want some kind of assertion here, or is "best effort" okay?
- ENTERED.with(|c| {
- if let EnterContext::Entered {
- allow_blocking: false,
- } = c.get()
- {
- c.set(EnterContext::Entered {
- allow_blocking: true,
- });
- }
- })
- }
+ pub(crate) struct DisallowBlockingGuard(bool);
+ impl Drop for DisallowBlockingGuard {
+ fn drop(&mut self) {
+ if self.0 {
+ // XXX: Do we want some kind of assertion here, or is "best effort" okay?
+ ENTERED.with(|c| {
+ if let EnterContext::Entered {
+ allow_blocking: false,
+ } = c.get()
+ {
+ c.set(EnterContext::Entered {
+ allow_blocking: true,
+ });
+ }
+ })
}
}
}
}
-cfg_rt_threaded! {
- cfg_blocking! {
- /// Returns true if in a runtime context.
- pub(crate) fn context() -> EnterContext {
- ENTERED.with(|c| c.get())
- }
+cfg_rt_multi_thread! {
+ /// Returns true if in a runtime context.
+ pub(crate) fn context() -> EnterContext {
+ ENTERED.with(|c| c.get())
}
}
-cfg_block_on! {
+cfg_rt! {
impl Enter {
/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
- pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, crate::park::ParkError>
+ pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, ParkError>
where
F: std::future::Future,
{
- use crate::park::{CachedParkThread, Park};
- use std::task::Context;
- use std::task::Poll::Ready;
+ use crate::park::thread::CachedParkThread;
let mut park = CachedParkThread::new();
- let waker = park.get_unpark()?.into_waker();
- let mut cx = Context::from_waker(&waker);
-
- pin!(f);
-
- loop {
- if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
- return Ok(v);
- }
-
- park.park()?;
- }
+ park.block_on(f)
}
- }
-}
-cfg_blocking_impl! {
- use crate::park::ParkError;
- use std::time::Duration;
-
- impl Enter {
/// Blocks the thread on the specified future for **at most** `timeout`
///
/// If the future completes before `timeout`, the result is returned. If
@@ -180,7 +159,8 @@ cfg_blocking_impl! {
where
F: std::future::Future,
{
- use crate::park::{CachedParkThread, Park};
+ use crate::park::Park;
+ use crate::park::thread::CachedParkThread;
use std::task::Context;
use std::task::Poll::Ready;
use std::time::Instant;
diff --git a/src/runtime/handle.rs b/src/runtime/handle.rs
index 0716a7f..b1e8d8f 100644
--- a/src/runtime/handle.rs
+++ b/src/runtime/handle.rs
@@ -1,16 +1,6 @@
-use crate::runtime::{blocking, context, io, time, Spawner};
-use std::{error, fmt};
-
-cfg_blocking! {
- use crate::runtime::task;
- use crate::runtime::blocking::task::BlockingTask;
-}
-
-cfg_rt_core! {
- use crate::task::JoinHandle;
-
- use std::future::Future;
-}
+use crate::runtime::blocking::task::BlockingTask;
+use crate::runtime::task::{self, JoinHandle};
+use crate::runtime::{blocking, driver, Spawner};
/// Handle to the runtime.
///
@@ -19,353 +9,56 @@ cfg_rt_core! {
///
/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
#[derive(Debug, Clone)]
-pub struct Handle {
+pub(crate) struct Handle {
pub(super) spawner: Spawner,
/// Handles to the I/O drivers
- pub(super) io_handle: io::Handle,
+ pub(super) io_handle: driver::IoHandle,
+
+ /// Handles to the signal drivers
+ pub(super) signal_handle: driver::SignalHandle,
/// Handles to the time drivers
- pub(super) time_handle: time::Handle,
+ pub(super) time_handle: driver::TimeHandle,
/// Source of `Instant::now()`
- pub(super) clock: time::Clock,
+ pub(super) clock: driver::Clock,
/// Blocking pool spawner
pub(super) blocking_spawner: blocking::Spawner,
}
impl Handle {
- /// Enter the runtime context. This allows you to construct types that must
- /// have an executor available on creation such as [`Delay`] or [`TcpStream`].
- /// It will also allow you to call methods such as [`tokio::spawn`].
- ///
- /// This function is also available as [`Runtime::enter`].
- ///
- /// [`Delay`]: struct@crate::time::Delay
- /// [`TcpStream`]: struct@crate::net::TcpStream
- /// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter
- /// [`tokio::spawn`]: fn@crate::spawn
- ///
- /// # Example
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- ///
- /// fn function_that_spawns(msg: String) {
- /// // Had we not used `handle.enter` below, this would panic.
- /// tokio::spawn(async move {
- /// println!("{}", msg);
- /// });
- /// }
- ///
- /// fn main() {
- /// let rt = Runtime::new().unwrap();
- /// let handle = rt.handle().clone();
- ///
- /// let s = "Hello World!".to_string();
- ///
- /// // By entering the context, we tie `tokio::spawn` to this executor.
- /// handle.enter(|| function_that_spawns(s));
- /// }
- /// ```
- pub fn enter<F, R>(&self, f: F) -> R
+ // /// Enter the runtime context. This allows you to construct types that must
+ // /// have an executor available on creation such as [`Sleep`] or [`TcpStream`].
+ // /// It will also allow you to call methods such as [`tokio::spawn`].
+ // pub(crate) fn enter<F, R>(&self, f: F) -> R
+ // where
+ // F: FnOnce() -> R,
+ // {
+ // context::enter(self.clone(), f)
+ // }
+
+ /// Run the provided function on an executor dedicated to blocking operations.
+ pub(crate) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
- F: FnOnce() -> R,
+ F: FnOnce() -> R + Send + 'static,
{
- context::enter(self.clone(), f)
- }
-
- /// Returns a `Handle` view over the currently running `Runtime`
- ///
- /// # Panic
- ///
- /// This will panic if called outside the context of a Tokio runtime. That means that you must
- /// call this on one of the threads **being run by the runtime**. Calling this from within a
- /// thread created by `std::thread::spawn` (for example) will cause a panic.
- ///
- /// # Examples
- ///
- /// This can be used to obtain the handle of the surrounding runtime from an async
- /// block or function running on that runtime.
- ///
- /// ```
- /// # use std::thread;
- /// # use tokio::runtime::Runtime;
- /// # fn dox() {
- /// # let rt = Runtime::new().unwrap();
- /// # rt.spawn(async {
- /// use tokio::runtime::Handle;
- ///
- /// // Inside an async block or function.
- /// let handle = Handle::current();
- /// handle.spawn(async {
- /// println!("now running in the existing Runtime");
- /// });
- ///
- /// # let handle =
- /// thread::spawn(move || {
- /// // Notice that the handle is created outside of this thread and then moved in
- /// handle.block_on(async { /* ... */ })
- /// // This next line would cause a panic
- /// // let handle2 = Handle::current();
- /// });
- /// # handle.join().unwrap();
- /// # });
- /// # }
- /// ```
- pub fn current() -> Self {
- context::current().expect("not currently running on the Tokio runtime.")
- }
-
- /// Returns a Handle view over the currently running Runtime
- ///
- /// Returns an error if no Runtime has been started
- ///
- /// Contrary to `current`, this never panics
- pub fn try_current() -> Result<Self, TryCurrentError> {
- context::current().ok_or(TryCurrentError(()))
+ #[cfg(feature = "tracing")]
+ let func = {
+ let span = tracing::trace_span!(
+ target: "tokio::task",
+ "task",
+ kind = %"blocking",
+ function = %std::any::type_name::<F>(),
+ );
+ move || {
+ let _g = span.enter();
+ func()
+ }
+ };
+ let (task, handle) = task::joinable(BlockingTask::new(func));
+ let _ = self.blocking_spawner.spawn(task, &self);
+ handle
}
}
-
-cfg_rt_core! {
- impl Handle {
- /// Spawns a future onto the Tokio runtime.
- ///
- /// This spawns the given future onto the runtime's executor, usually a
- /// thread pool. The thread pool is then responsible for polling the future
- /// until it completes.
- ///
- /// See [module level][mod] documentation for more details.
- ///
- /// [mod]: index.html
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- ///
- /// # fn dox() {
- /// // Create the runtime
- /// let rt = Runtime::new().unwrap();
- /// let handle = rt.handle();
- ///
- /// // Spawn a future onto the runtime
- /// handle.spawn(async {
- /// println!("now running on a worker thread");
- /// });
- /// # }
- /// ```
- ///
- /// # Panics
- ///
- /// This function will not panic unless task execution is disabled on the
- /// executor. This can only happen if the runtime was built using
- /// [`Builder`] without picking either [`basic_scheduler`] or
- /// [`threaded_scheduler`].
- ///
- /// [`Builder`]: struct@crate::runtime::Builder
- /// [`threaded_scheduler`]: fn@crate::runtime::Builder::threaded_scheduler
- /// [`basic_scheduler`]: fn@crate::runtime::Builder::basic_scheduler
- pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- self.spawner.spawn(future)
- }
-
- /// Run a future to completion on the Tokio runtime from a synchronous
- /// context.
- ///
- /// This runs the given future on the runtime, blocking until it is
- /// complete, and yielding its resolved result. Any tasks or timers which
- /// the future spawns internally will be executed on the runtime.
- ///
- /// If the provided executor currently has no active core thread, this
- /// function might hang until a core thread is added. This is not a
- /// concern when using the [threaded scheduler], as it always has active
- /// core threads, but if you use the [basic scheduler], some other
- /// thread must currently be inside a call to [`Runtime::block_on`].
- /// See also [the module level documentation][1], which has a section on
- /// scheduler types.
- ///
- /// This method may not be called from an asynchronous context.
- ///
- /// [threaded scheduler]: fn@crate::runtime::Builder::threaded_scheduler
- /// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler
- /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
- /// [1]: index.html#runtime-configurations
- ///
- /// # Panics
- ///
- /// This function panics if the provided future panics, or if called
- /// within an asynchronous execution context.
- ///
- /// # Examples
- ///
- /// Using `block_on` with the [threaded scheduler].
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- /// use std::thread;
- ///
- /// // Create the runtime.
- /// //
- /// // If the rt-threaded feature is enabled, this creates a threaded
- /// // scheduler by default.
- /// let rt = Runtime::new().unwrap();
- /// let handle = rt.handle().clone();
- ///
- /// // Use the runtime from another thread.
- /// let th = thread::spawn(move || {
- /// // Execute the future, blocking the current thread until completion.
- /// //
- /// // This example uses the threaded scheduler, so no concurrent call to
- /// // `rt.block_on` is required.
- /// handle.block_on(async {
- /// println!("hello");
- /// });
- /// });
- ///
- /// th.join().unwrap();
- /// ```
- ///
- /// Using the [basic scheduler] requires a concurrent call to
- /// [`Runtime::block_on`]:
- ///
- /// [threaded scheduler]: fn@crate::runtime::Builder::threaded_scheduler
- /// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler
- /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
- ///
- /// ```
- /// use tokio::runtime::Builder;
- /// use tokio::sync::oneshot;
- /// use std::thread;
- ///
- /// // Create the runtime.
- /// let mut rt = Builder::new()
- /// .enable_all()
- /// .basic_scheduler()
- /// .build()
- /// .unwrap();
- ///
- /// let handle = rt.handle().clone();
- ///
- /// // Signal main thread when task has finished.
- /// let (send, recv) = oneshot::channel();
- ///
- /// // Use the runtime from another thread.
- /// let th = thread::spawn(move || {
- /// // Execute the future, blocking the current thread until completion.
- /// handle.block_on(async {
- /// send.send("done").unwrap();
- /// });
- /// });
- ///
- /// // The basic scheduler is used, so the thread above might hang if we
- /// // didn't call block_on on the rt too.
- /// rt.block_on(async {
- /// assert_eq!(recv.await.unwrap(), "done");
- /// });
- /// # th.join().unwrap();
- /// ```
- ///
- pub fn block_on<F: Future>(&self, future: F) -> F::Output {
- self.enter(|| {
- let mut enter = crate::runtime::enter(true);
- enter.block_on(future).expect("failed to park thread")
- })
- }
- }
-}
-
-cfg_blocking! {
- impl Handle {
- /// Runs the provided closure on a thread where blocking is acceptable.
- ///
- /// In general, issuing a blocking call or performing a lot of compute in a
- /// future without yielding is not okay, as it may prevent the executor from
- /// driving other futures forward. This function runs the provided closure
- /// on a thread dedicated to blocking operations. See the [CPU-bound tasks
- /// and blocking code][blocking] section for more information.
- ///
- /// Tokio will spawn more blocking threads when they are requested through
- /// this function until the upper limit configured on the [`Builder`] is
- /// reached. This limit is very large by default, because `spawn_blocking` is
- /// often used for various kinds of IO operations that cannot be performed
- /// asynchronously. When you run CPU-bound code using `spawn_blocking`, you
- /// should keep this large upper limit in mind; to run your CPU-bound
- /// computations on only a few threads, you should use a separate thread
- /// pool such as [rayon] rather than configuring the number of blocking
- /// threads.
- ///
- /// This function is intended for non-async operations that eventually
- /// finish on their own. If you want to spawn an ordinary thread, you should
- /// use [`thread::spawn`] instead.
- ///
- /// Closures spawned using `spawn_blocking` cannot be cancelled. When you
- /// shut down the executor, it will wait indefinitely for all blocking
- /// operations to finish. You can use [`shutdown_timeout`] to stop waiting
- /// for them after a certain timeout. Be aware that this will still not
- /// cancel the tasks — they are simply allowed to keep running after the
- /// method returns.
- ///
- /// Note that if you are using the [basic scheduler], this function will
- /// still spawn additional threads for blocking operations. The basic
- /// scheduler's single thread is only used for asynchronous code.
- ///
- /// [`Builder`]: struct@crate::runtime::Builder
- /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code
- /// [rayon]: https://docs.rs/rayon
- /// [basic scheduler]: fn@crate::runtime::Builder::basic_scheduler
- /// [`thread::spawn`]: fn@std::thread::spawn
- /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- ///
- /// # async fn docs() -> Result<(), Box<dyn std::error::Error>>{
- /// // Create the runtime
- /// let rt = Runtime::new().unwrap();
- /// let handle = rt.handle();
- ///
- /// let res = handle.spawn_blocking(move || {
- /// // do some compute-heavy work or call synchronous code
- /// "done computing"
- /// }).await?;
- ///
- /// assert_eq!(res, "done computing");
- /// # Ok(())
- /// # }
- /// ```
- pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R>
- where
- F: FnOnce() -> R + Send + 'static,
- R: Send + 'static,
- {
- let (task, handle) = task::joinable(BlockingTask::new(f));
- let _ = self.blocking_spawner.spawn(task, self);
- handle
- }
- }
-}
-
-/// Error returned by `try_current` when no Runtime has been started
-pub struct TryCurrentError(());
-
-impl fmt::Debug for TryCurrentError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("TryCurrentError").finish()
- }
-}
-
-impl fmt::Display for TryCurrentError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.write_str("no tokio Runtime has been initialized")
- }
-}
-
-impl error::Error for TryCurrentError {}
diff --git a/src/runtime/io.rs b/src/runtime/io.rs
deleted file mode 100644
index 6a0953a..0000000
--- a/src/runtime/io.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-//! Abstracts out the APIs necessary to `Runtime` for integrating the I/O
-//! driver. When the `time` feature flag is **not** enabled. These APIs are
-//! shells. This isolates the complexity of dealing with conditional
-//! compilation.
-
-/// Re-exported for convenience.
-pub(crate) use std::io::Result;
-
-pub(crate) use variant::*;
-
-#[cfg(feature = "io-driver")]
-mod variant {
- use crate::io::driver;
- use crate::park::{Either, ParkThread};
-
- use std::io;
-
- /// The driver value the runtime passes to the `timer` layer.
- ///
- /// When the `io-driver` feature is enabled, this is the "real" I/O driver
- /// backed by Mio. Without the `io-driver` feature, this is a thread parker
- /// backed by a condition variable.
- pub(crate) type Driver = Either<driver::Driver, ParkThread>;
-
- /// The handle the runtime stores for future use.
- ///
- /// When the `io-driver` feature is **not** enabled, this is `()`.
- pub(crate) type Handle = Option<driver::Handle>;
-
- pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> {
- #[cfg(loom)]
- assert!(!enable);
-
- if enable {
- let driver = driver::Driver::new()?;
- let handle = driver.handle();
-
- Ok((Either::A(driver), Some(handle)))
- } else {
- let driver = ParkThread::new();
- Ok((Either::B(driver), None))
- }
- }
-}
-
-#[cfg(not(feature = "io-driver"))]
-mod variant {
- use crate::park::ParkThread;
-
- use std::io;
-
- /// I/O is not enabled, use a condition variable based parker
- pub(crate) type Driver = ParkThread;
-
- /// There is no handle
- pub(crate) type Handle = ();
-
- pub(crate) fn create_driver(_enable: bool) -> io::Result<(Driver, Handle)> {
- let driver = ParkThread::new();
-
- Ok((driver, ()))
- }
-}
diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs
index 300a146..be4aa38 100644
--- a/src/runtime/mod.rs
+++ b/src/runtime/mod.rs
@@ -1,8 +1,7 @@
//! The Tokio runtime.
//!
-//! Unlike other Rust programs, asynchronous applications require
-//! runtime support. In particular, the following runtime services are
-//! necessary:
+//! Unlike other Rust programs, asynchronous applications require runtime
+//! support. In particular, the following runtime services are necessary:
//!
//! * An **I/O event loop**, called the driver, which drives I/O resources and
//! dispatches I/O events to tasks that depend on them.
@@ -10,14 +9,14 @@
//! * A **timer** for scheduling work to run after a set period of time.
//!
//! Tokio's [`Runtime`] bundles all of these services as a single type, allowing
-//! them to be started, shut down, and configured together. However, most
-//! applications won't need to use [`Runtime`] directly. Instead, they can
-//! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under
-//! the hood.
+//! them to be started, shut down, and configured together. However, often it is
+//! not required to configure a [`Runtime`] manually, and user may just use the
+//! [`tokio::main`] attribute macro, which creates a [`Runtime`] under the hood.
//!
//! # Usage
//!
-//! Most applications will use the [`tokio::main`] attribute macro.
+//! When no fine tuning is required, the [`tokio::main`] attribute macro can be
+//! used.
//!
//! ```no_run
//! use tokio::net::TcpListener;
@@ -25,7 +24,7 @@
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
-//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+//! let listener = TcpListener::bind("127.0.0.1:8080").await?;
//!
//! loop {
//! let (mut socket, _) = listener.accept().await?;
@@ -69,11 +68,11 @@
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Create the runtime
-//! let mut rt = Runtime::new()?;
+//! let rt = Runtime::new()?;
//!
//! // Spawn the root task
//! rt.block_on(async {
-//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+//! let listener = TcpListener::bind("127.0.0.1:8080").await?;
//!
//! loop {
//! let (mut socket, _) = listener.accept().await?;
@@ -111,48 +110,38 @@
//! applications. The [runtime builder] or `#[tokio::main]` attribute may be
//! used to select which scheduler to use.
//!
-//! #### Basic Scheduler
+//! #### Multi-Thread Scheduler
//!
-//! The basic scheduler provides a _single-threaded_ future executor. All tasks
-//! will be created and executed on the current thread. The basic scheduler
-//! requires the `rt-core` feature flag, and can be selected using the
-//! [`Builder::basic_scheduler`] method:
+//! The multi-thread scheduler executes futures on a _thread pool_, using a
+//! work-stealing strategy. By default, it will start a worker thread for each
+//! CPU core available on the system. This tends to be the ideal configurations
+//! for most applications. The multi-thread scheduler requires the `rt-multi-thread`
+//! feature flag, and is selected by default:
//! ```
//! use tokio::runtime;
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
-//! let basic_rt = runtime::Builder::new()
-//! .basic_scheduler()
-//! .build()?;
+//! let threaded_rt = runtime::Runtime::new()?;
//! # Ok(()) }
//! ```
//!
-//! If the `rt-core` feature is enabled and `rt-threaded` is not,
-//! [`Runtime::new`] will return a basic scheduler runtime by default.
+//! Most applications should use the multi-thread scheduler, except in some
+//! niche use-cases, such as when running only a single thread is required.
//!
-//! #### Threaded Scheduler
+//! #### Current-Thread Scheduler
//!
-//! The threaded scheduler executes futures on a _thread pool_, using a
-//! work-stealing strategy. By default, it will start a worker thread for each
-//! CPU core available on the system. This tends to be the ideal configurations
-//! for most applications. The threaded scheduler requires the `rt-threaded` feature
-//! flag, and can be selected using the [`Builder::threaded_scheduler`] method:
+//! The current-thread scheduler provides a _single-threaded_ future executor.
+//! All tasks will be created and executed on the current thread. This requires
+//! the `rt` feature flag.
//! ```
//! use tokio::runtime;
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
-//! let threaded_rt = runtime::Builder::new()
-//! .threaded_scheduler()
+//! let basic_rt = runtime::Builder::new_current_thread()
//! .build()?;
//! # Ok(()) }
//! ```
//!
-//! If the `rt-threaded` feature flag is enabled, [`Runtime::new`] will return a
-//! threaded scheduler runtime by default.
-//!
-//! Most applications should use the threaded scheduler, except in some niche
-//! use-cases, such as when running only a single thread is required.
-//!
//! #### Resource drivers
//!
//! When configuring a runtime by hand, no resource drivers are enabled by
@@ -164,8 +153,8 @@
//! ## Lifetime of spawned threads
//!
//! The runtime may spawn threads depending on its configuration and usage. The
-//! threaded scheduler spawns threads to schedule tasks and calls to
-//! `spawn_blocking` spawn threads to run blocking operations.
+//! multi-thread scheduler spawns threads to schedule tasks and for `spawn_blocking`
+//! calls.
//!
//! While the `Runtime` is active, threads may shutdown after periods of being
//! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown.
@@ -188,394 +177,380 @@
#[macro_use]
mod tests;
-pub(crate) mod context;
+pub(crate) mod enter;
+
+pub(crate) mod task;
-cfg_rt_core! {
+cfg_rt! {
mod basic_scheduler;
use basic_scheduler::BasicScheduler;
- pub(crate) mod task;
-}
-
-mod blocking;
-use blocking::BlockingPool;
+ mod blocking;
+ use blocking::BlockingPool;
+ pub(crate) use blocking::spawn_blocking;
-cfg_blocking_impl! {
- #[allow(unused_imports)]
- pub(crate) use blocking::{spawn_blocking, try_spawn_blocking};
-}
+ mod builder;
+ pub use self::builder::Builder;
-mod builder;
-pub use self::builder::Builder;
+ pub(crate) mod context;
+ pub(crate) mod driver;
-pub(crate) mod enter;
-use self::enter::enter;
+ use self::enter::enter;
-mod handle;
-pub use self::handle::{Handle, TryCurrentError};
+ mod handle;
+ use handle::Handle;
-mod io;
+ mod spawner;
+ use self::spawner::Spawner;
+}
-cfg_rt_threaded! {
+cfg_rt_multi_thread! {
mod park;
use park::Parker;
}
-mod shell;
-use self::shell::Shell;
-
-mod spawner;
-use self::spawner::Spawner;
-
-mod time;
-
-cfg_rt_threaded! {
+cfg_rt_multi_thread! {
mod queue;
pub(crate) mod thread_pool;
use self::thread_pool::ThreadPool;
}
-cfg_rt_core! {
+cfg_rt! {
use crate::task::JoinHandle;
-}
-
-use std::future::Future;
-use std::time::Duration;
-
-/// The Tokio runtime.
-///
-/// The runtime provides an I/O driver, task scheduler, [timer], and blocking
-/// pool, necessary for running asynchronous tasks.
-///
-/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However,
-/// most users will use the `#[tokio::main]` annotation on their entry point instead.
-///
-/// See [module level][mod] documentation for more details.
-///
-/// # Shutdown
-///
-/// Shutting down the runtime is done by dropping the value. The current thread
-/// will block until the shut down operation has completed.
-///
-/// * Drain any scheduled work queues.
-/// * Drop any futures that have not yet completed.
-/// * Drop the reactor.
-///
-/// Once the reactor has dropped, any outstanding I/O resources bound to
-/// that reactor will no longer function. Calling any method on them will
-/// result in an error.
-///
-/// [timer]: crate::time
-/// [mod]: index.html
-/// [`new`]: method@Self::new
-/// [`Builder`]: struct@Builder
-/// [`tokio::run`]: fn@run
-#[derive(Debug)]
-pub struct Runtime {
- /// Task executor
- kind: Kind,
-
- /// Handle to runtime, also contains driver handles
- handle: Handle,
-
- /// Blocking pool handle, used to signal shutdown
- blocking_pool: BlockingPool,
-}
-
-/// The runtime executor is either a thread-pool or a current-thread executor.
-#[derive(Debug)]
-enum Kind {
- /// Not able to execute concurrent tasks. This variant is mostly used to get
- /// access to the driver handles.
- Shell(Shell),
- /// Execute all tasks on the current-thread.
- #[cfg(feature = "rt-core")]
- Basic(BasicScheduler<time::Driver>),
+ use std::future::Future;
+ use std::time::Duration;
- /// Execute tasks across multiple threads.
- #[cfg(feature = "rt-threaded")]
- ThreadPool(ThreadPool),
-}
-
-/// After thread starts / before thread stops
-type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
-
-impl Runtime {
- /// Create a new runtime instance with default configuration values.
+ /// The Tokio runtime.
///
- /// This results in a scheduler, I/O driver, and time driver being
- /// initialized. The type of scheduler used depends on what feature flags
- /// are enabled: if the `rt-threaded` feature is enabled, the [threaded
- /// scheduler] is used, while if only the `rt-core` feature is enabled, the
- /// [basic scheduler] is used instead.
+ /// The runtime provides an I/O driver, task scheduler, [timer], and
+ /// blocking pool, necessary for running asynchronous tasks.
///
- /// If the threaded scheduler is selected, it will not spawn
- /// any worker threads until it needs to, i.e. tasks are scheduled to run.
- ///
- /// Most applications will not need to call this function directly. Instead,
- /// they will use the [`#[tokio::main]` attribute][main]. When more complex
- /// configuration is necessary, the [runtime builder] may be used.
+ /// Instances of `Runtime` can be created using [`new`], or [`Builder`].
+ /// However, most users will use the `#[tokio::main]` annotation on their
+ /// entry point instead.
///
/// See [module level][mod] documentation for more details.
///
- /// # Examples
- ///
- /// Creating a new `Runtime` with default configuration values.
+ /// # Shutdown
///
- /// ```
- /// use tokio::runtime::Runtime;
+ /// Shutting down the runtime is done by dropping the value. The current
+ /// thread will block until the shut down operation has completed.
///
- /// let rt = Runtime::new()
- /// .unwrap();
+ /// * Drain any scheduled work queues.
+ /// * Drop any futures that have not yet completed.
+ /// * Drop the reactor.
///
- /// // Use the runtime...
- /// ```
+ /// Once the reactor has dropped, any outstanding I/O resources bound to
+ /// that reactor will no longer function. Calling any method on them will
+ /// result in an error.
///
- /// [mod]: index.html
- /// [main]: ../attr.main.html
- /// [threaded scheduler]: index.html#threaded-scheduler
- /// [basic scheduler]: index.html#basic-scheduler
- /// [runtime builder]: crate::runtime::Builder
- pub fn new() -> io::Result<Runtime> {
- #[cfg(feature = "rt-threaded")]
- let ret = Builder::new().threaded_scheduler().enable_all().build();
-
- #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))]
- let ret = Builder::new().basic_scheduler().enable_all().build();
-
- #[cfg(not(feature = "rt-core"))]
- let ret = Builder::new().enable_all().build();
-
- ret
- }
-
- /// Spawn a future onto the Tokio runtime.
+ /// # Sharing
///
- /// This spawns the given future onto the runtime's executor, usually a
- /// thread pool. The thread pool is then responsible for polling the future
- /// until it completes.
+ /// The Tokio runtime implements `Sync` and `Send` to allow you to wrap it
+ /// in a `Arc`. Most fn take `&self` to allow you to call them concurrently
+ /// accross multiple threads.
///
- /// See [module level][mod] documentation for more details.
+ /// Calls to `shutdown` and `shutdown_timeout` require exclusive ownership of
+ /// the runtime type and this can be achieved via `Arc::try_unwrap` when only
+ /// one strong count reference is left over.
///
+ /// [timer]: crate::time
/// [mod]: index.html
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- ///
- /// # fn dox() {
- /// // Create the runtime
- /// let rt = Runtime::new().unwrap();
- ///
- /// // Spawn a future onto the runtime
- /// rt.spawn(async {
- /// println!("now running on a worker thread");
- /// });
- /// # }
- /// ```
- ///
- /// # Panics
- ///
- /// This function will not panic unless task execution is disabled on the
- /// executor. This can only happen if the runtime was built using
- /// [`Builder`] without picking either [`basic_scheduler`] or
- /// [`threaded_scheduler`].
- ///
+ /// [`new`]: method@Self::new
/// [`Builder`]: struct@Builder
- /// [`threaded_scheduler`]: fn@Builder::threaded_scheduler
- /// [`basic_scheduler`]: fn@Builder::basic_scheduler
- #[cfg(feature = "rt-core")]
- pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- match &self.kind {
- Kind::Shell(_) => panic!("task execution disabled"),
- #[cfg(feature = "rt-threaded")]
- Kind::ThreadPool(exec) => exec.spawn(future),
- Kind::Basic(exec) => exec.spawn(future),
- }
- }
+ #[derive(Debug)]
+ pub struct Runtime {
+ /// Task executor
+ kind: Kind,
- /// Run a future to completion on the Tokio runtime. This is the runtime's
- /// entry point.
- ///
- /// This runs the given future on the runtime, blocking until it is
- /// complete, and yielding its resolved result. Any tasks or timers which
- /// the future spawns internally will be executed on the runtime.
- ///
- /// `&mut` is required as calling `block_on` **may** result in advancing the
- /// state of the runtime. The details depend on how the runtime is
- /// configured. [`runtime::Handle::block_on`][handle] provides a version
- /// that takes `&self`.
- ///
- /// This method may not be called from an asynchronous context.
- ///
- /// # Panics
- ///
- /// This function panics if the provided future panics, or if called within an
- /// asynchronous execution context.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::runtime::Runtime;
- ///
- /// // Create the runtime
- /// let mut rt = Runtime::new().unwrap();
- ///
- /// // Execute the future, blocking the current thread until completion
- /// rt.block_on(async {
- /// println!("hello");
- /// });
- /// ```
- ///
- /// [handle]: fn@Handle::block_on
- pub fn block_on<F: Future>(&mut self, future: F) -> F::Output {
- let kind = &mut self.kind;
+ /// Handle to runtime, also contains driver handles
+ handle: Handle,
- self.handle.enter(|| match kind {
- Kind::Shell(exec) => exec.block_on(future),
- #[cfg(feature = "rt-core")]
- Kind::Basic(exec) => exec.block_on(future),
- #[cfg(feature = "rt-threaded")]
- Kind::ThreadPool(exec) => exec.block_on(future),
- })
+ /// Blocking pool handle, used to signal shutdown
+ blocking_pool: BlockingPool,
}
- /// Enter the runtime context. This allows you to construct types that must
- /// have an executor available on creation such as [`Delay`] or [`TcpStream`].
- /// It will also allow you to call methods such as [`tokio::spawn`].
- ///
- /// This function is also available as [`Handle::enter`].
- ///
- /// [`Delay`]: struct@crate::time::Delay
- /// [`TcpStream`]: struct@crate::net::TcpStream
- /// [`Handle::enter`]: fn@crate::runtime::Handle::enter
- /// [`tokio::spawn`]: fn@crate::spawn
- ///
- /// # Example
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- ///
- /// fn function_that_spawns(msg: String) {
- /// // Had we not used `rt.enter` below, this would panic.
- /// tokio::spawn(async move {
- /// println!("{}", msg);
- /// });
- /// }
+ /// Runtime context guard.
///
- /// fn main() {
- /// let rt = Runtime::new().unwrap();
- ///
- /// let s = "Hello World!".to_string();
- ///
- /// // By entering the context, we tie `tokio::spawn` to this executor.
- /// rt.enter(|| function_that_spawns(s));
- /// }
- /// ```
- pub fn enter<F, R>(&self, f: F) -> R
- where
- F: FnOnce() -> R,
- {
- self.handle.enter(f)
+ /// Returned by [`Runtime::enter`], the context guard exits the runtime
+ /// context on drop.
+ #[derive(Debug)]
+ pub struct EnterGuard<'a> {
+ rt: &'a Runtime,
+ guard: context::EnterGuard,
}
- /// Return a handle to the runtime's spawner.
- ///
- /// The returned handle can be used to spawn tasks that run on this runtime, and can
- /// be cloned to allow moving the `Handle` to other threads.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- ///
- /// let rt = Runtime::new()
- /// .unwrap();
- ///
- /// let handle = rt.handle();
- ///
- /// handle.spawn(async { println!("hello"); });
- /// ```
- pub fn handle(&self) -> &Handle {
- &self.handle
- }
+ /// The runtime executor is either a thread-pool or a current-thread executor.
+ #[derive(Debug)]
+ enum Kind {
+ /// Execute all tasks on the current-thread.
+ CurrentThread(BasicScheduler<driver::Driver>),
- /// Shutdown the runtime, waiting for at most `duration` for all spawned
- /// task to shutdown.
- ///
- /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to
- /// shutdown in a timely fashion. However, dropping a `Runtime` will wait
- /// indefinitely for all tasks to terminate, and there are cases where a long
- /// blocking task has been spawned, which can block dropping `Runtime`.
- ///
- /// In this case, calling `shutdown_timeout` with an explicit wait timeout
- /// can work. The `shutdown_timeout` will signal all tasks to shutdown and
- /// will wait for at most `duration` for all spawned tasks to terminate. If
- /// `timeout` elapses before all tasks are dropped, the function returns and
- /// outstanding tasks are potentially leaked.
- ///
- /// # Examples
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- /// use tokio::task;
- ///
- /// use std::thread;
- /// use std::time::Duration;
- ///
- /// fn main() {
- /// let mut runtime = Runtime::new().unwrap();
- ///
- /// runtime.block_on(async move {
- /// task::spawn_blocking(move || {
- /// thread::sleep(Duration::from_secs(10_000));
- /// });
- /// });
- ///
- /// runtime.shutdown_timeout(Duration::from_millis(100));
- /// }
- /// ```
- pub fn shutdown_timeout(self, duration: Duration) {
- let Runtime {
- mut blocking_pool, ..
- } = self;
- blocking_pool.shutdown(Some(duration));
+ /// Execute tasks across multiple threads.
+ #[cfg(feature = "rt-multi-thread")]
+ ThreadPool(ThreadPool),
}
- /// Shutdown the runtime, without waiting for any spawned tasks to shutdown.
- ///
- /// This can be useful if you want to drop a runtime from within another runtime.
- /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
- /// to complete, which would normally not be permitted within an asynchronous context.
- /// By calling `shutdown_background()`, you can drop the runtime from such a context.
- ///
- /// Note however, that because we do not wait for any blocking tasks to complete, this
- /// may result in a resource leak (in that any blocking tasks are still running until they
- /// return.
- ///
- /// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`.
- ///
- /// ```
- /// use tokio::runtime::Runtime;
- ///
- /// fn main() {
- /// let mut runtime = Runtime::new().unwrap();
- ///
- /// runtime.block_on(async move {
- /// let inner_runtime = Runtime::new().unwrap();
- /// // ...
- /// inner_runtime.shutdown_background();
- /// });
- /// }
- /// ```
- pub fn shutdown_background(self) {
- self.shutdown_timeout(Duration::from_nanos(0))
+ /// After thread starts / before thread stops
+ type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
+
+ impl Runtime {
+ /// Create a new runtime instance with default configuration values.
+ ///
+ /// This results in the multi threaded scheduler, I/O driver, and time driver being
+ /// initialized.
+ ///
+ /// Most applications will not need to call this function directly. Instead,
+ /// they will use the [`#[tokio::main]` attribute][main]. When a more complex
+ /// configuration is necessary, the [runtime builder] may be used.
+ ///
+ /// See [module level][mod] documentation for more details.
+ ///
+ /// # Examples
+ ///
+ /// Creating a new `Runtime` with default configuration values.
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// let rt = Runtime::new()
+ /// .unwrap();
+ ///
+ /// // Use the runtime...
+ /// ```
+ ///
+ /// [mod]: index.html
+ /// [main]: ../attr.main.html
+ /// [threaded scheduler]: index.html#threaded-scheduler
+ /// [basic scheduler]: index.html#basic-scheduler
+ /// [runtime builder]: crate::runtime::Builder
+ #[cfg(feature = "rt-multi-thread")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
+ pub fn new() -> std::io::Result<Runtime> {
+ Builder::new_multi_thread().enable_all().build()
+ }
+
+ /// Spawn a future onto the Tokio runtime.
+ ///
+ /// This spawns the given future onto the runtime's executor, usually a
+ /// thread pool. The thread pool is then responsible for polling the future
+ /// until it completes.
+ ///
+ /// See [module level][mod] documentation for more details.
+ ///
+ /// [mod]: index.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// # fn dox() {
+ /// // Create the runtime
+ /// let rt = Runtime::new().unwrap();
+ ///
+ /// // Spawn a future onto the runtime
+ /// rt.spawn(async {
+ /// println!("now running on a worker thread");
+ /// });
+ /// # }
+ /// ```
+ pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ match &self.kind {
+ #[cfg(feature = "rt-multi-thread")]
+ Kind::ThreadPool(exec) => exec.spawn(future),
+ Kind::CurrentThread(exec) => exec.spawn(future),
+ }
+ }
+
+ /// Run the provided function on an executor dedicated to blocking operations.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// # fn dox() {
+ /// // Create the runtime
+ /// let rt = Runtime::new().unwrap();
+ ///
+ /// // Spawn a blocking function onto the runtime
+ /// rt.spawn_blocking(|| {
+ /// println!("now running on a worker thread");
+ /// });
+ /// # }
+ pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
+ where
+ F: FnOnce() -> R + Send + 'static,
+ {
+ self.handle.spawn_blocking(func)
+ }
+
+ /// Run a future to completion on the Tokio runtime. This is the
+ /// runtime's entry point.
+ ///
+ /// This runs the given future on the runtime, blocking until it is
+ /// complete, and yielding its resolved result. Any tasks or timers
+ /// which the future spawns internally will be executed on the runtime.
+ ///
+ /// # Multi thread scheduler
+ ///
+ /// When the multi thread scheduler is used this will allow futures
+ /// to run within the io driver and timer context of the overall runtime.
+ ///
+ /// # Current thread scheduler
+ ///
+ /// When the current thread scheduler is enabled `block_on`
+ /// can be called concurrently from multiple threads. The first call
+ /// will take ownership of the io and timer drivers. This means
+ /// other threads which do not own the drivers will hook into that one.
+ /// When the first `block_on` completes, other threads will be able to
+ /// "steal" the driver to allow continued execution of their futures.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the provided future panics, or if not called within an
+ /// asynchronous execution context.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::runtime::Runtime;
+ ///
+ /// // Create the runtime
+ /// let rt = Runtime::new().unwrap();
+ ///
+ /// // Execute the future, blocking the current thread until completion
+ /// rt.block_on(async {
+ /// println!("hello");
+ /// });
+ /// ```
+ ///
+ /// [handle]: fn@Handle::block_on
+ pub fn block_on<F: Future>(&self, future: F) -> F::Output {
+ let _enter = self.enter();
+
+ match &self.kind {
+ Kind::CurrentThread(exec) => exec.block_on(future),
+ #[cfg(feature = "rt-multi-thread")]
+ Kind::ThreadPool(exec) => exec.block_on(future),
+ }
+ }
+
+ /// Enter the runtime context.
+ ///
+ /// This allows you to construct types that must have an executor
+ /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
+ /// also allow you to call methods such as [`tokio::spawn`].
+ ///
+ /// [`Sleep`]: struct@crate::time::Sleep
+ /// [`TcpStream`]: struct@crate::net::TcpStream
+ /// [`tokio::spawn`]: fn@crate::spawn
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// fn function_that_spawns(msg: String) {
+ /// // Had we not used `rt.enter` below, this would panic.
+ /// tokio::spawn(async move {
+ /// println!("{}", msg);
+ /// });
+ /// }
+ ///
+ /// fn main() {
+ /// let rt = Runtime::new().unwrap();
+ ///
+ /// let s = "Hello World!".to_string();
+ ///
+ /// // By entering the context, we tie `tokio::spawn` to this executor.
+ /// let _guard = rt.enter();
+ /// function_that_spawns(s);
+ /// }
+ /// ```
+ pub fn enter(&self) -> EnterGuard<'_> {
+ EnterGuard {
+ rt: self,
+ guard: context::enter(self.handle.clone()),
+ }
+ }
+
+ /// Shutdown the runtime, waiting for at most `duration` for all spawned
+ /// task to shutdown.
+ ///
+ /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to
+ /// shutdown in a timely fashion. However, dropping a `Runtime` will wait
+ /// indefinitely for all tasks to terminate, and there are cases where a long
+ /// blocking task has been spawned, which can block dropping `Runtime`.
+ ///
+ /// In this case, calling `shutdown_timeout` with an explicit wait timeout
+ /// can work. The `shutdown_timeout` will signal all tasks to shutdown and
+ /// will wait for at most `duration` for all spawned tasks to terminate. If
+ /// `timeout` elapses before all tasks are dropped, the function returns and
+ /// outstanding tasks are potentially leaked.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ /// use tokio::task;
+ ///
+ /// use std::thread;
+ /// use std::time::Duration;
+ ///
+ /// fn main() {
+ /// let runtime = Runtime::new().unwrap();
+ ///
+ /// runtime.block_on(async move {
+ /// task::spawn_blocking(move || {
+ /// thread::sleep(Duration::from_secs(10_000));
+ /// });
+ /// });
+ ///
+ /// runtime.shutdown_timeout(Duration::from_millis(100));
+ /// }
+ /// ```
+ pub fn shutdown_timeout(mut self, duration: Duration) {
+ // Wakeup and shutdown all the worker threads
+ self.handle.spawner.shutdown();
+ self.blocking_pool.shutdown(Some(duration));
+ }
+
+ /// Shutdown the runtime, without waiting for any spawned tasks to shutdown.
+ ///
+ /// This can be useful if you want to drop a runtime from within another runtime.
+ /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
+ /// to complete, which would normally not be permitted within an asynchronous context.
+ /// By calling `shutdown_background()`, you can drop the runtime from such a context.
+ ///
+ /// Note however, that because we do not wait for any blocking tasks to complete, this
+ /// may result in a resource leak (in that any blocking tasks are still running until they
+ /// return.
+ ///
+ /// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`.
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// fn main() {
+ /// let runtime = Runtime::new().unwrap();
+ ///
+ /// runtime.block_on(async move {
+ /// let inner_runtime = Runtime::new().unwrap();
+ /// // ...
+ /// inner_runtime.shutdown_background();
+ /// });
+ /// }
+ /// ```
+ pub fn shutdown_background(self) {
+ self.shutdown_timeout(Duration::from_nanos(0))
+ }
}
}
diff --git a/src/runtime/park.rs b/src/runtime/park.rs
index ee437d1..033b9f2 100644
--- a/src/runtime/park.rs
+++ b/src/runtime/park.rs
@@ -6,7 +6,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::park::{Park, Unpark};
-use crate::runtime::time;
+use crate::runtime::driver::Driver;
use crate::util::TryLock;
use std::sync::atomic::Ordering::SeqCst;
@@ -42,14 +42,14 @@ const NOTIFIED: usize = 3;
/// Shared across multiple Parker handles
struct Shared {
/// Shared driver. Only one thread at a time can use this
- driver: TryLock<time::Driver>,
+ driver: TryLock<Driver>,
/// Unpark handle
- handle: <time::Driver as Park>::Unpark,
+ handle: <Driver as Park>::Unpark,
}
impl Parker {
- pub(crate) fn new(driver: time::Driver) -> Parker {
+ pub(crate) fn new(driver: Driver) -> Parker {
let handle = driver.unpark();
Parker {
@@ -104,6 +104,10 @@ impl Park for Parker {
Ok(())
}
}
+
+ fn shutdown(&mut self) {
+ self.inner.shutdown();
+ }
}
impl Unpark for Unparker {
@@ -138,7 +142,7 @@ impl Inner {
fn park_condvar(&self) {
// Otherwise we need to coordinate going to sleep
- let mut m = self.mutex.lock().unwrap();
+ let mut m = self.mutex.lock();
match self
.state
@@ -176,7 +180,7 @@ impl Inner {
}
}
- fn park_driver(&self, driver: &mut time::Driver) {
+ fn park_driver(&self, driver: &mut Driver) {
match self
.state
.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
@@ -234,7 +238,7 @@ impl Inner {
// Releasing `lock` before the call to `notify_one` means that when the
// parked thread wakes it doesn't get woken only to have to wait for us
// to release `lock`.
- drop(self.mutex.lock().unwrap());
+ drop(self.mutex.lock());
self.condvar.notify_one()
}
@@ -242,4 +246,12 @@ impl Inner {
fn unpark_driver(&self) {
self.shared.handle.unpark();
}
+
+ fn shutdown(&self) {
+ if let Some(mut driver) = self.shared.driver.try_lock() {
+ driver.shutdown();
+ }
+
+ self.condvar.notify_all();
+ }
}
diff --git a/src/runtime/queue.rs b/src/runtime/queue.rs
index c654514..cdf4009 100644
--- a/src/runtime/queue.rs
+++ b/src/runtime/queue.rs
@@ -481,7 +481,7 @@ impl<T: 'static> Inject<T> {
/// Close the injection queue, returns `true` if the queue is open when the
/// transition is made.
pub(super) fn close(&self) -> bool {
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
if p.is_closed {
return false;
@@ -492,7 +492,7 @@ impl<T: 'static> Inject<T> {
}
pub(super) fn is_closed(&self) -> bool {
- self.pointers.lock().unwrap().is_closed
+ self.pointers.lock().is_closed
}
pub(super) fn len(&self) -> usize {
@@ -502,7 +502,7 @@ impl<T: 'static> Inject<T> {
/// Pushes a value into the queue.
pub(super) fn push(&self, task: task::Notified<T>) {
// Acquire queue lock
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
if p.is_closed {
// Drop the mutex to avoid a potential deadlock when
@@ -541,7 +541,7 @@ impl<T: 'static> Inject<T> {
debug_assert!(get_next(batch_tail).is_none());
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
if let Some(tail) = p.tail {
set_next(tail, Some(batch_head));
@@ -566,7 +566,7 @@ impl<T: 'static> Inject<T> {
return None;
}
- let mut p = self.pointers.lock().unwrap();
+ let mut p = self.pointers.lock();
// It is possible to hit null here if another thread poped the last
// task between us checking `len` and acquiring the lock.
diff --git a/src/runtime/shell.rs b/src/runtime/shell.rs
index a65869d..486d4fa 100644
--- a/src/runtime/shell.rs
+++ b/src/runtime/shell.rs
@@ -1,52 +1,84 @@
#![allow(clippy::redundant_clone)]
+use crate::future::poll_fn;
use crate::park::{Park, Unpark};
-use crate::runtime::enter;
-use crate::runtime::time;
+use crate::runtime::driver::Driver;
+use crate::sync::Notify;
use crate::util::{waker_ref, Wake};
-use std::future::Future;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::task::Context;
-use std::task::Poll::Ready;
+use std::task::Poll::{Pending, Ready};
+use std::{future::Future, sync::PoisonError};
#[derive(Debug)]
pub(super) struct Shell {
- driver: time::Driver,
+ driver: Mutex<Option<Driver>>,
+
+ notify: Notify,
/// TODO: don't store this
unpark: Arc<Handle>,
}
#[derive(Debug)]
-struct Handle(<time::Driver as Park>::Unpark);
+struct Handle(<Driver as Park>::Unpark);
impl Shell {
- pub(super) fn new(driver: time::Driver) -> Shell {
+ pub(super) fn new(driver: Driver) -> Shell {
let unpark = Arc::new(Handle(driver.unpark()));
- Shell { driver, unpark }
+ Shell {
+ driver: Mutex::new(Some(driver)),
+ notify: Notify::new(),
+ unpark,
+ }
}
- pub(super) fn block_on<F>(&mut self, f: F) -> F::Output
+ pub(super) fn block_on<F>(&self, f: F) -> F::Output
where
F: Future,
{
- let _e = enter(true);
+ let mut enter = crate::runtime::enter(true);
pin!(f);
- let waker = waker_ref(&self.unpark);
- let mut cx = Context::from_waker(&waker);
-
loop {
- if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
- return v;
- }
+ if let Some(driver) = &mut self.take_driver() {
+ return driver.block_on(f);
+ } else {
+ let notified = self.notify.notified();
+ pin!(notified);
+
+ if let Some(out) = enter
+ .block_on(poll_fn(|cx| {
+ if notified.as_mut().poll(cx).is_ready() {
+ return Ready(None);
+ }
+
+ if let Ready(out) = f.as_mut().poll(cx) {
+ return Ready(Some(out));
+ }
- self.driver.park().unwrap();
+ Pending
+ }))
+ .expect("Failed to `Enter::block_on`")
+ {
+ return out;
+ }
+ }
}
}
+
+ fn take_driver(&self) -> Option<DriverGuard<'_>> {
+ let mut lock = self.driver.lock().unwrap();
+ let driver = lock.take()?;
+
+ Some(DriverGuard {
+ inner: Some(driver),
+ shell: &self,
+ })
+ }
}
impl Wake for Handle {
@@ -60,3 +92,41 @@ impl Wake for Handle {
arc_self.0.unpark();
}
}
+
+struct DriverGuard<'a> {
+ inner: Option<Driver>,
+ shell: &'a Shell,
+}
+
+impl DriverGuard<'_> {
+ fn block_on<F: Future>(&mut self, f: F) -> F::Output {
+ let driver = self.inner.as_mut().unwrap();
+
+ pin!(f);
+
+ let waker = waker_ref(&self.shell.unpark);
+ let mut cx = Context::from_waker(&waker);
+
+ loop {
+ if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
+ return v;
+ }
+
+ driver.park().unwrap();
+ }
+ }
+}
+
+impl Drop for DriverGuard<'_> {
+ fn drop(&mut self) {
+ if let Some(inner) = self.inner.take() {
+ self.shell
+ .driver
+ .lock()
+ .unwrap_or_else(PoisonError::into_inner)
+ .replace(inner);
+
+ self.shell.notify.notify_one();
+ }
+ }
+}
diff --git a/src/runtime/spawner.rs b/src/runtime/spawner.rs
index d136945..a37c667 100644
--- a/src/runtime/spawner.rs
+++ b/src/runtime/spawner.rs
@@ -1,24 +1,34 @@
-cfg_rt_core! {
+cfg_rt! {
use crate::runtime::basic_scheduler;
use crate::task::JoinHandle;
use std::future::Future;
}
-cfg_rt_threaded! {
+cfg_rt_multi_thread! {
use crate::runtime::thread_pool;
}
#[derive(Debug, Clone)]
pub(crate) enum Spawner {
- Shell,
- #[cfg(feature = "rt-core")]
+ #[cfg(feature = "rt")]
Basic(basic_scheduler::Spawner),
- #[cfg(feature = "rt-threaded")]
+ #[cfg(feature = "rt-multi-thread")]
ThreadPool(thread_pool::Spawner),
}
-cfg_rt_core! {
+impl Spawner {
+ pub(crate) fn shutdown(&mut self) {
+ #[cfg(feature = "rt-multi-thread")]
+ {
+ if let Spawner::ThreadPool(spawner) = self {
+ spawner.shutdown();
+ }
+ }
+ }
+}
+
+cfg_rt! {
impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
@@ -26,10 +36,9 @@ cfg_rt_core! {
F::Output: Send + 'static,
{
match self {
- Spawner::Shell => panic!("spawning not enabled for runtime"),
- #[cfg(feature = "rt-core")]
+ #[cfg(feature = "rt")]
Spawner::Basic(spawner) => spawner.spawn(future),
- #[cfg(feature = "rt-threaded")]
+ #[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.spawn(future),
}
}
diff --git a/src/runtime/task/core.rs b/src/runtime/task/core.rs
index f4756c2..dfa8764 100644
--- a/src/runtime/task/core.rs
+++ b/src/runtime/task/core.rs
@@ -269,7 +269,7 @@ impl<T: Future, S: Schedule> Core<T, S> {
}
}
-cfg_rt_threaded! {
+cfg_rt_multi_thread! {
impl Header {
pub(crate) fn shutdown(&self) {
use crate::runtime::task::RawTask;
diff --git a/src/runtime/task/error.rs b/src/runtime/task/error.rs
index d5f65a4..177fe65 100644
--- a/src/runtime/task/error.rs
+++ b/src/runtime/task/error.rs
@@ -3,7 +3,7 @@ use std::fmt;
use std::io;
use std::sync::Mutex;
-doc_rt_core! {
+cfg_rt! {
/// Task failed to execute to completion.
pub struct JoinError {
repr: Repr,
@@ -16,25 +16,13 @@ enum Repr {
}
impl JoinError {
- #[doc(hidden)]
- #[deprecated]
- pub fn cancelled() -> JoinError {
- Self::cancelled2()
- }
-
- pub(crate) fn cancelled2() -> JoinError {
+ pub(crate) fn cancelled() -> JoinError {
JoinError {
repr: Repr::Cancelled,
}
}
- #[doc(hidden)]
- #[deprecated]
- pub fn panic(err: Box<dyn Any + Send + 'static>) -> JoinError {
- Self::panic2(err)
- }
-
- pub(crate) fn panic2(err: Box<dyn Any + Send + 'static>) -> JoinError {
+ pub(crate) fn panic(err: Box<dyn Any + Send + 'static>) -> JoinError {
JoinError {
repr: Repr::Panic(Mutex::new(err)),
}
@@ -42,10 +30,7 @@ impl JoinError {
/// Returns true if the error was caused by the task being cancelled
pub fn is_cancelled(&self) -> bool {
- match &self.repr {
- Repr::Cancelled => true,
- _ => false,
- }
+ matches!(&self.repr, Repr::Cancelled)
}
/// Returns true if the error was caused by the task panicking
@@ -65,10 +50,7 @@ impl JoinError {
/// }
/// ```
pub fn is_panic(&self) -> bool {
- match &self.repr {
- Repr::Panic(_) => true,
- _ => false,
- }
+ matches!(&self.repr, Repr::Panic(_))
}
/// Consumes the join error, returning the object with which the task panicked.
diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs
index e86b29e..208d48c 100644
--- a/src/runtime/task/harness.rs
+++ b/src/runtime/task/harness.rs
@@ -102,7 +102,7 @@ where
// If the task is cancelled, avoid polling it, instead signalling it
// is complete.
if snapshot.is_cancelled() {
- Poll::Ready(Err(JoinError::cancelled2()))
+ Poll::Ready(Err(JoinError::cancelled()))
} else {
let res = guard.core.poll(self.header());
@@ -132,7 +132,7 @@ where
}
}
Err(err) => {
- self.complete(Err(JoinError::panic2(err)), snapshot.is_join_interested());
+ self.complete(Err(JoinError::panic(err)), snapshot.is_join_interested());
}
}
}
@@ -297,9 +297,9 @@ where
// Dropping the future panicked, complete the join
// handle with the panic to avoid dropping the panic
// on the ground.
- self.complete(Err(JoinError::panic2(err)), true);
+ self.complete(Err(JoinError::panic(err)), true);
} else {
- self.complete(Err(JoinError::cancelled2()), true);
+ self.complete(Err(JoinError::cancelled()), true);
}
}
diff --git a/src/runtime/task/join.rs b/src/runtime/task/join.rs
index 3c4aabb..dedfb38 100644
--- a/src/runtime/task/join.rs
+++ b/src/runtime/task/join.rs
@@ -6,7 +6,7 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
-doc_rt_core! {
+cfg_rt! {
/// An owned permission to join on a task (await its termination).
///
/// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for
@@ -45,6 +45,71 @@ doc_rt_core! {
/// # }
/// ```
///
+ /// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task.
+ /// If the return value is an i32, the join handle has type `JoinHandle<i32>`:
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn doc() {
+ /// let join_handle: task::JoinHandle<i32> = task::spawn(async {
+ /// 5 + 3
+ /// });
+ /// # }
+ ///
+ /// ```
+ ///
+ /// If the task does not have a return value, the join handle has type `JoinHandle<()>`:
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn doc() {
+ /// let join_handle: task::JoinHandle<()> = task::spawn(async {
+ /// println!("I return nothing.");
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a
+ /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has
+ /// to be double chained to extract the returned value:
+ ///
+ /// ```
+ /// use tokio::task;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
+ /// Ok(5 + 3)
+ /// });
+ ///
+ /// let result = join_handle.await??;
+ /// assert_eq!(result, 8);
+ /// Ok(())
+ /// }
+ /// ```
+ ///
+ /// If the task panics, the error is a [`JoinError`] that contains the panic:
+ ///
+ /// ```
+ /// use tokio::task;
+ /// use std::io;
+ /// use std::panic;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
+ /// panic!("boom");
+ /// });
+ ///
+ /// let err = join_handle.await.unwrap_err();
+ /// assert!(err.is_panic());
+ /// Ok(())
+ /// }
+ ///
+ /// ```
/// Child being detached and outliving its parent:
///
/// ```no_run
@@ -56,7 +121,7 @@ doc_rt_core! {
/// let original_task = task::spawn(async {
/// let _detached_task = task::spawn(async {
/// // Here we sleep to make sure that the first task returns before.
- /// time::delay_for(Duration::from_millis(10)).await;
+ /// time::sleep(Duration::from_millis(10)).await;
/// // This will be called, even though the JoinHandle is dropped.
/// println!("♫ Still alive ♫");
/// });
@@ -68,13 +133,14 @@ doc_rt_core! {
/// // We make sure that the new task has time to run, before the main
/// // task returns.
///
- /// time::delay_for(Duration::from_millis(1000)).await;
+ /// time::sleep(Duration::from_millis(1000)).await;
/// # }
/// ```
///
/// [`task::spawn`]: crate::task::spawn()
/// [`task::spawn_blocking`]: crate::task::spawn_blocking
/// [`std::thread::JoinHandle`]: std::thread::JoinHandle
+ /// [`JoinError`]: crate::task::JoinError
pub struct JoinHandle<T> {
raw: Option<RawTask>,
_p: PhantomData<T>,
@@ -91,6 +157,44 @@ impl<T> JoinHandle<T> {
_p: PhantomData,
}
}
+
+ /// Abort the task associated with the handle.
+ ///
+ /// Awaiting a cancelled task might complete as usual if the task was
+ /// already completed at the time it was cancelled, but most likely it
+ /// will complete with a `Err(JoinError::Cancelled)`.
+ ///
+ /// ```rust
+ /// use tokio::time;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut handles = Vec::new();
+ ///
+ /// handles.push(tokio::spawn(async {
+ /// time::sleep(time::Duration::from_secs(10)).await;
+ /// true
+ /// }));
+ ///
+ /// handles.push(tokio::spawn(async {
+ /// time::sleep(time::Duration::from_secs(10)).await;
+ /// false
+ /// }));
+ ///
+ /// for handle in &handles {
+ /// handle.abort();
+ /// }
+ ///
+ /// for handle in handles {
+ /// assert!(handle.await.unwrap_err().is_cancelled());
+ /// }
+ /// }
+ /// ```
+ pub fn abort(&self) {
+ if let Some(raw) = self.raw {
+ raw.shutdown();
+ }
+ }
}
impl<T> Unpin for JoinHandle<T> {}
diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs
index 17b5157..7b49e95 100644
--- a/src/runtime/task/mod.rs
+++ b/src/runtime/task/mod.rs
@@ -21,7 +21,7 @@ use self::state::State;
mod waker;
-cfg_rt_threaded! {
+cfg_rt_multi_thread! {
mod stack;
pub(crate) use self::stack::TransferStack;
}
@@ -79,25 +79,27 @@ pub(crate) trait Schedule: Sync + Sized + 'static {
}
}
-/// Create a new task with an associated join handle
-pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
-where
- T: Future + Send + 'static,
- S: Schedule,
-{
- let raw = RawTask::new::<_, S>(task);
+cfg_rt! {
+ /// Create a new task with an associated join handle
+ pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
+ where
+ T: Future + Send + 'static,
+ S: Schedule,
+ {
+ let raw = RawTask::new::<_, S>(task);
- let task = Task {
- raw,
- _p: PhantomData,
- };
+ let task = Task {
+ raw,
+ _p: PhantomData,
+ };
- let join = JoinHandle::new(raw);
+ let join = JoinHandle::new(raw);
- (Notified(task), join)
+ (Notified(task), join)
+ }
}
-cfg_rt_util! {
+cfg_rt! {
/// Create a new `!Send` task with an associated join handle
pub(crate) unsafe fn joinable_local<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
where
@@ -130,7 +132,7 @@ impl<S: 'static> Task<S> {
}
}
-cfg_rt_threaded! {
+cfg_rt_multi_thread! {
impl<S: 'static> Notified<S> {
pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Notified<S> {
Notified(Task::from_raw(ptr))
diff --git a/src/runtime/tests/loom_blocking.rs b/src/runtime/tests/loom_blocking.rs
index db7048e..8fb54c5 100644
--- a/src/runtime/tests/loom_blocking.rs
+++ b/src/runtime/tests/loom_blocking.rs
@@ -8,14 +8,15 @@ fn blocking_shutdown() {
let v = Arc::new(());
let rt = mk_runtime(1);
- rt.enter(|| {
+ {
+ let _enter = rt.enter();
for _ in 0..2 {
let v = v.clone();
crate::task::spawn_blocking(move || {
assert!(1 < Arc::strong_count(&v));
});
}
- });
+ }
drop(rt);
assert_eq!(1, Arc::strong_count(&v));
@@ -23,9 +24,8 @@ fn blocking_shutdown() {
}
fn mk_runtime(num_threads: usize) -> Runtime {
- runtime::Builder::new()
- .threaded_scheduler()
- .core_threads(num_threads)
+ runtime::Builder::new_multi_thread()
+ .worker_threads(num_threads)
.build()
.unwrap()
}
diff --git a/src/runtime/tests/loom_pool.rs b/src/runtime/tests/loom_pool.rs
index c08658c..06ad641 100644
--- a/src/runtime/tests/loom_pool.rs
+++ b/src/runtime/tests/loom_pool.rs
@@ -178,7 +178,7 @@ mod group_b {
#[test]
fn join_output() {
loom::model(|| {
- let mut rt = mk_pool(1);
+ let rt = mk_pool(1);
rt.block_on(async {
let t = crate::spawn(track(async { "hello" }));
@@ -192,7 +192,7 @@ mod group_b {
#[test]
fn poll_drop_handle_then_drop() {
loom::model(|| {
- let mut rt = mk_pool(1);
+ let rt = mk_pool(1);
rt.block_on(async move {
let mut t = crate::spawn(track(async { "hello" }));
@@ -209,7 +209,7 @@ mod group_b {
#[test]
fn complete_block_on_under_load() {
loom::model(|| {
- let mut pool = mk_pool(1);
+ let pool = mk_pool(1);
pool.block_on(async {
// Trigger a re-schedule
@@ -296,9 +296,8 @@ mod group_d {
}
fn mk_pool(num_threads: usize) -> Runtime {
- runtime::Builder::new()
- .threaded_scheduler()
- .core_threads(num_threads)
+ runtime::Builder::new_multi_thread()
+ .worker_threads(num_threads)
.build()
.unwrap()
}
diff --git a/src/runtime/tests/task.rs b/src/runtime/tests/task.rs
index 82315a0..a34526f 100644
--- a/src/runtime/tests/task.rs
+++ b/src/runtime/tests/task.rs
@@ -1,5 +1,5 @@
use crate::runtime::task::{self, Schedule, Task};
-use crate::util::linked_list::LinkedList;
+use crate::util::linked_list::{Link, LinkedList};
use crate::util::TryLock;
use std::collections::VecDeque;
@@ -72,7 +72,7 @@ struct Inner {
struct Core {
queue: VecDeque<task::Notified<Runtime>>,
- tasks: LinkedList<Task<Runtime>>,
+ tasks: LinkedList<Task<Runtime>, <Task<Runtime> as Link>::Target>,
}
static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
diff --git a/src/runtime/thread_pool/atomic_cell.rs b/src/runtime/thread_pool/atomic_cell.rs
index 2bda0fc..98847e6 100644
--- a/src/runtime/thread_pool/atomic_cell.rs
+++ b/src/runtime/thread_pool/atomic_cell.rs
@@ -22,7 +22,6 @@ impl<T> AtomicCell<T> {
from_raw(old)
}
- #[cfg(feature = "blocking")]
pub(super) fn set(&self, val: Box<T>) {
let _ = self.swap(Some(val));
}
diff --git a/src/runtime/thread_pool/idle.rs b/src/runtime/thread_pool/idle.rs
index ae87ca4..6e692fd 100644
--- a/src/runtime/thread_pool/idle.rs
+++ b/src/runtime/thread_pool/idle.rs
@@ -55,7 +55,7 @@ impl Idle {
}
// Acquire the lock
- let mut sleepers = self.sleepers.lock().unwrap();
+ let mut sleepers = self.sleepers.lock();
// Check again, now that the lock is acquired
if !self.notify_should_wakeup() {
@@ -77,7 +77,7 @@ impl Idle {
/// work.
pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool {
// Acquire the lock
- let mut sleepers = self.sleepers.lock().unwrap();
+ let mut sleepers = self.sleepers.lock();
// Decrement the number of unparked threads
let ret = State::dec_num_unparked(&self.state, is_searching);
@@ -112,7 +112,7 @@ impl Idle {
/// Unpark a specific worker. This happens if tasks are submitted from
/// within the worker's park routine.
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
- let mut sleepers = self.sleepers.lock().unwrap();
+ let mut sleepers = self.sleepers.lock();
for index in 0..sleepers.len() {
if sleepers[index] == worker_id {
@@ -128,7 +128,7 @@ impl Idle {
/// Returns `true` if `worker_id` is contained in the sleep set
pub(super) fn is_parked(&self, worker_id: usize) -> bool {
- let sleepers = self.sleepers.lock().unwrap();
+ let sleepers = self.sleepers.lock();
sleepers.contains(&worker_id)
}
diff --git a/src/runtime/thread_pool/mod.rs b/src/runtime/thread_pool/mod.rs
index ced9712..e39695a 100644
--- a/src/runtime/thread_pool/mod.rs
+++ b/src/runtime/thread_pool/mod.rs
@@ -9,9 +9,7 @@ use self::idle::Idle;
mod worker;
pub(crate) use worker::Launch;
-cfg_blocking! {
- pub(crate) use worker::block_in_place;
-}
+pub(crate) use worker::block_in_place;
use crate::loom::sync::Arc;
use crate::runtime::task::{self, JoinHandle};
@@ -91,7 +89,7 @@ impl fmt::Debug for ThreadPool {
impl Drop for ThreadPool {
fn drop(&mut self) {
- self.spawner.shared.close();
+ self.spawner.shutdown();
}
}
@@ -108,6 +106,10 @@ impl Spawner {
self.shared.schedule(task, false);
handle
}
+
+ pub(crate) fn shutdown(&mut self) {
+ self.shared.close();
+ }
}
impl fmt::Debug for Spawner {
diff --git a/src/runtime/thread_pool/worker.rs b/src/runtime/thread_pool/worker.rs
index abe20da..bc544c9 100644
--- a/src/runtime/thread_pool/worker.rs
+++ b/src/runtime/thread_pool/worker.rs
@@ -9,10 +9,11 @@ use crate::loom::rand::seed;
use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
use crate::runtime;
+use crate::runtime::enter::EnterContext;
use crate::runtime::park::{Parker, Unparker};
use crate::runtime::thread_pool::{AtomicCell, Idle};
use crate::runtime::{queue, task};
-use crate::util::linked_list::LinkedList;
+use crate::util::linked_list::{Link, LinkedList};
use crate::util::FastRand;
use std::cell::RefCell;
@@ -53,7 +54,7 @@ struct Core {
is_shutdown: bool,
/// Tasks owned by the core
- tasks: LinkedList<Task>,
+ tasks: LinkedList<Task, <Task as Link>::Target>,
/// Parker
///
@@ -172,104 +173,96 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
(shared, launch)
}
-cfg_blocking! {
- use crate::runtime::enter::EnterContext;
-
- pub(crate) fn block_in_place<F, R>(f: F) -> R
- where
- F: FnOnce() -> R,
- {
- // Try to steal the worker core back
- struct Reset(coop::Budget);
-
- impl Drop for Reset {
- fn drop(&mut self) {
- CURRENT.with(|maybe_cx| {
- if let Some(cx) = maybe_cx {
- let core = cx.worker.core.take();
- let mut cx_core = cx.core.borrow_mut();
- assert!(cx_core.is_none());
- *cx_core = core;
-
- // Reset the task budget as we are re-entering the
- // runtime.
- coop::set(self.0);
- }
- });
- }
+pub(crate) fn block_in_place<F, R>(f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ // Try to steal the worker core back
+ struct Reset(coop::Budget);
+
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ CURRENT.with(|maybe_cx| {
+ if let Some(cx) = maybe_cx {
+ let core = cx.worker.core.take();
+ let mut cx_core = cx.core.borrow_mut();
+ assert!(cx_core.is_none());
+ *cx_core = core;
+
+ // Reset the task budget as we are re-entering the
+ // runtime.
+ coop::set(self.0);
+ }
+ });
}
+ }
- let mut had_core = false;
- let mut had_entered = false;
+ let mut had_entered = false;
- CURRENT.with(|maybe_cx| {
- match (crate::runtime::enter::context(), maybe_cx.is_some()) {
- (EnterContext::Entered { .. }, true) => {
- // We are on a thread pool runtime thread, so we just need to set up blocking.
+ CURRENT.with(|maybe_cx| {
+ match (crate::runtime::enter::context(), maybe_cx.is_some()) {
+ (EnterContext::Entered { .. }, true) => {
+ // We are on a thread pool runtime thread, so we just need to set up blocking.
+ had_entered = true;
+ }
+ (EnterContext::Entered { allow_blocking }, false) => {
+ // We are on an executor, but _not_ on the thread pool.
+ // That is _only_ okay if we are in a thread pool runtime's block_on method:
+ if allow_blocking {
had_entered = true;
- }
- (EnterContext::Entered { allow_blocking }, false) => {
- // We are on an executor, but _not_ on the thread pool.
- // That is _only_ okay if we are in a thread pool runtime's block_on method:
- if allow_blocking {
- had_entered = true;
- return;
- } else {
- // This probably means we are on the basic_scheduler or in a LocalSet,
- // where it is _not_ okay to block.
- panic!("can call blocking only when running on the multi-threaded runtime");
- }
- }
- (EnterContext::NotEntered, true) => {
- // This is a nested call to block_in_place (we already exited).
- // All the necessary setup has already been done.
- return;
- }
- (EnterContext::NotEntered, false) => {
- // We are outside of the tokio runtime, so blocking is fine.
- // We can also skip all of the thread pool blocking setup steps.
return;
+ } else {
+ // This probably means we are on the basic_scheduler or in a LocalSet,
+ // where it is _not_ okay to block.
+ panic!("can call blocking only when running on the multi-threaded runtime");
}
}
+ (EnterContext::NotEntered, true) => {
+ // This is a nested call to block_in_place (we already exited).
+ // All the necessary setup has already been done.
+ return;
+ }
+ (EnterContext::NotEntered, false) => {
+ // We are outside of the tokio runtime, so blocking is fine.
+ // We can also skip all of the thread pool blocking setup steps.
+ return;
+ }
+ }
- let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
-
- // Get the worker core. If none is set, then blocking is fine!
- let core = match cx.core.borrow_mut().take() {
- Some(core) => core,
- None => return,
- };
-
- // The parker should be set here
- assert!(core.park.is_some());
+ let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
- // In order to block, the core must be sent to another thread for
- // execution.
- //
- // First, move the core back into the worker's shared core slot.
- cx.worker.core.set(core);
- had_core = true;
+ // Get the worker core. If none is set, then blocking is fine!
+ let core = match cx.core.borrow_mut().take() {
+ Some(core) => core,
+ None => return,
+ };
- // Next, clone the worker handle and send it to a new thread for
- // processing.
- //
- // Once the blocking task is done executing, we will attempt to
- // steal the core back.
- let worker = cx.worker.clone();
- runtime::spawn_blocking(move || run(worker));
- });
+ // The parker should be set here
+ assert!(core.park.is_some());
+
+ // In order to block, the core must be sent to another thread for
+ // execution.
+ //
+ // First, move the core back into the worker's shared core slot.
+ cx.worker.core.set(core);
+
+ // Next, clone the worker handle and send it to a new thread for
+ // processing.
+ //
+ // Once the blocking task is done executing, we will attempt to
+ // steal the core back.
+ let worker = cx.worker.clone();
+ runtime::spawn_blocking(move || run(worker));
+ });
- if had_core {
- // Unset the current task's budget. Blocking sections are not
- // constrained by task budgets.
- let _reset = Reset(coop::stop());
+ if had_entered {
+ // Unset the current task's budget. Blocking sections are not
+ // constrained by task budgets.
+ let _reset = Reset(coop::stop());
- crate::runtime::enter::exit(f)
- } else if had_entered {
- crate::runtime::enter::exit(f)
- } else {
- f()
- }
+ crate::runtime::enter::exit(f)
+ } else {
+ f()
}
}
@@ -576,6 +569,8 @@ impl Core {
// Drain the queue
while self.next_local_task().is_some() {}
+
+ park.shutdown();
}
fn drain_pending_drop(&mut self, worker: &Worker) {
@@ -785,7 +780,7 @@ impl Shared {
///
/// If all workers have reached this point, the final cleanup is performed.
fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
- let mut workers = self.shutdown_workers.lock().unwrap();
+ let mut workers = self.shutdown_workers.lock();
workers.push((core, worker));
if workers.len() != self.remotes.len() {
diff --git a/src/runtime/time.rs b/src/runtime/time.rs
deleted file mode 100644
index c623d96..0000000
--- a/src/runtime/time.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-//! Abstracts out the APIs necessary to `Runtime` for integrating the time
-//! driver. When the `time` feature flag is **not** enabled. These APIs are
-//! shells. This isolates the complexity of dealing with conditional
-//! compilation.
-
-pub(crate) use variant::*;
-
-#[cfg(feature = "time")]
-mod variant {
- use crate::park::Either;
- use crate::runtime::io;
- use crate::time::{self, driver};
-
- pub(crate) type Clock = time::Clock;
- pub(crate) type Driver = Either<driver::Driver<io::Driver>, io::Driver>;
- pub(crate) type Handle = Option<driver::Handle>;
-
- pub(crate) fn create_clock() -> Clock {
- Clock::new()
- }
-
- /// Create a new timer driver / handle pair
- pub(crate) fn create_driver(
- enable: bool,
- io_driver: io::Driver,
- clock: Clock,
- ) -> (Driver, Handle) {
- if enable {
- let driver = driver::Driver::new(io_driver, clock);
- let handle = driver.handle();
-
- (Either::A(driver), Some(handle))
- } else {
- (Either::B(io_driver), None)
- }
- }
-}
-
-#[cfg(not(feature = "time"))]
-mod variant {
- use crate::runtime::io;
-
- pub(crate) type Clock = ();
- pub(crate) type Driver = io::Driver;
- pub(crate) type Handle = ();
-
- pub(crate) fn create_clock() -> Clock {
- ()
- }
-
- /// Create a new timer driver / handle pair
- pub(crate) fn create_driver(
- _enable: bool,
- io_driver: io::Driver,
- _clock: Clock,
- ) -> (Driver, Handle) {
- (io_driver, ())
- }
-}