aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/task/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime/task/mod.rs')
-rw-r--r--src/runtime/task/mod.rs211
1 files changed, 195 insertions, 16 deletions
diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs
index 1f18209..fea6e0f 100644
--- a/src/runtime/task/mod.rs
+++ b/src/runtime/task/mod.rs
@@ -25,7 +25,7 @@
//!
//! The task uses a reference count to keep track of how many active references
//! exist. The Unowned reference type takes up two ref-counts. All other
-//! reference types take pu a single ref-count.
+//! reference types take up a single ref-count.
//!
//! Besides the waker type, each task has at most one of each reference type.
//!
@@ -47,7 +47,8 @@
//!
//! * JOIN_INTEREST - Is set to one if there exists a JoinHandle.
//!
-//! * JOIN_WAKER - Is set to one if the JoinHandle has set a waker.
+//! * JOIN_WAKER - Acts as an access control bit for the join handle waker. The
+//! protocol for its usage is described below.
//!
//! The rest of the bits are used for the ref-count.
//!
@@ -71,10 +72,38 @@
//! a lock for the stage field, and it can be accessed only by the thread
//! that set RUNNING to one.
//!
-//! * If JOIN_WAKER is zero, then the JoinHandle has exclusive access to the
-//! join handle waker. If JOIN_WAKER and COMPLETE are both one, then the
-//! thread that set COMPLETE to one has exclusive access to the join handle
-//! waker.
+//! * The waker field may be concurrently accessed by different threads: in one
+//! thread the runtime may complete a task and *read* the waker field to
+//! invoke the waker, and in another thread the task's JoinHandle may be
+//! polled, and if the task hasn't yet completed, the JoinHandle may *write*
+//! a waker to the waker field. The JOIN_WAKER bit ensures safe access by
+//! multiple threads to the waker field using the following rules:
+//!
+//! 1. JOIN_WAKER is initialized to zero.
+//!
+//! 2. If JOIN_WAKER is zero, then the JoinHandle has exclusive (mutable)
+//! access to the waker field.
+//!
+//! 3. If JOIN_WAKER is one, then the JoinHandle has shared (read-only)
+//! access to the waker field.
+//!
+//! 4. If JOIN_WAKER is one and COMPLETE is one, then the runtime has shared
+//! (read-only) access to the waker field.
+//!
+//! 5. If the JoinHandle needs to write to the waker field, then the
+//! JoinHandle needs to (i) successfully set JOIN_WAKER to zero if it is
+//! not already zero to gain exclusive access to the waker field per rule
+//! 2, (ii) write a waker, and (iii) successfully set JOIN_WAKER to one.
+//!
+//! 6. The JoinHandle can change JOIN_WAKER only if COMPLETE is zero (i.e.
+//! the task hasn't yet completed).
+//!
+//! Rule 6 implies that the steps (i) or (iii) of rule 5 may fail due to a
+//! race. If step (i) fails, then the attempt to write a waker is aborted. If
+//! step (iii) fails because COMPLETE is set to one by another thread after
+//! step (i), then the waker field is cleared. Once COMPLETE is one (i.e.
+//! task has completed), the JoinHandle will not modify JOIN_WAKER. After the
+//! runtime sets COMPLETE to one, it invokes the waker if there is one.
//!
//! All other fields are immutable and can be accessed immutably without
//! synchronization by anyone.
@@ -121,7 +150,7 @@
//! 1. The output is created on the thread that the future was polled on. Since
//! only non-Send futures can have non-Send output, the future was polled on
//! the thread that the future was spawned from.
-//! 2. Since JoinHandle<Output> is not Send if Output is not Send, the
+//! 2. Since `JoinHandle<Output>` is not Send if Output is not Send, the
//! JoinHandle is also on the thread that the future was spawned from.
//! 3. Thus, the JoinHandle will not move the output across threads when it
//! takes or drops the output.
@@ -135,6 +164,12 @@
//! poll call will notice it when the poll finishes, and the task is cancelled
//! at that point.
+// Some task infrastructure is here to support `JoinSet`, which is currently
+// unstable. This should be removed once `JoinSet` is stabilized.
+#![cfg_attr(not(tokio_unstable), allow(dead_code))]
+
+use crate::runtime::context;
+
mod core;
use self::core::Cell;
use self::core::Header;
@@ -151,7 +186,14 @@ cfg_rt_multi_thread! {
pub(super) use self::inject::Inject;
}
+#[cfg(feature = "rt")]
+mod abort;
mod join;
+
+#[cfg(feature = "rt")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::abort::AbortHandle;
+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::join::JoinHandle;
@@ -173,6 +215,70 @@ use std::marker::PhantomData;
use std::ptr::NonNull;
use std::{fmt, mem};
+/// An opaque ID that uniquely identifies a task relative to all other currently
+/// running tasks.
+///
+/// # Notes
+///
+/// - Task IDs are unique relative to other *currently running* tasks. When a
+/// task completes, the same ID may be used for another task.
+/// - Task IDs are *not* sequential, and do not indicate the order in which
+/// tasks are spawned, what runtime a task is spawned on, or any other data.
+/// - The task ID of the currently running task can be obtained from inside the
+/// task via the [`task::try_id()`](crate::task::try_id()) and
+/// [`task::id()`](crate::task::id()) functions and from outside the task via
+/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [unstable]: crate#unstable-features
+#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
+pub struct Id(u64);
+
+/// Returns the [`Id`] of the currently running task.
+///
+/// # Panics
+///
+/// This function panics if called from outside a task. Please note that calls
+/// to `block_on` do not have task IDs, so the method will panic if called from
+/// within a call to `block_on`. For a version of this function that doesn't
+/// panic, see [`task::try_id()`](crate::runtime::task::try_id()).
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [task ID]: crate::task::Id
+/// [unstable]: crate#unstable-features
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[track_caller]
+pub fn id() -> Id {
+ context::current_task_id().expect("Can't get a task id when not inside a task")
+}
+
+/// Returns the [`Id`] of the currently running task, or `None` if called outside
+/// of a task.
+///
+/// This function is similar to [`task::id()`](crate::runtime::task::id()), except
+/// that it returns `None` rather than panicking if called outside of a task
+/// context.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [task ID]: crate::task::Id
+/// [unstable]: crate#unstable-features
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[track_caller]
+pub fn try_id() -> Option<Id> {
+ context::current_task_id()
+}
+
/// An owned handle to the task, tracked by ref count.
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
@@ -230,6 +336,11 @@ pub(crate) trait Schedule: Sync + Sized + 'static {
fn yield_now(&self, task: Notified<Self>) {
self.schedule(task);
}
+
+ /// Polling the task resulted in a panic. Should the runtime shutdown?
+ fn unhandled_panic(&self) {
+ // By default, do nothing. This maintains the 1.0 behavior.
+ }
}
cfg_rt! {
@@ -239,14 +350,15 @@ cfg_rt! {
/// notification.
fn new_task<T, S>(
task: T,
- scheduler: S
+ scheduler: S,
+ id: Id,
) -> (Task<S>, Notified<S>, JoinHandle<T::Output>)
where
S: Schedule,
T: Future + 'static,
T::Output: 'static,
{
- let raw = RawTask::new::<T, S>(task, scheduler);
+ let raw = RawTask::new::<T, S>(task, scheduler, id);
let task = Task {
raw,
_p: PhantomData,
@@ -264,13 +376,13 @@ cfg_rt! {
/// only when the task is not going to be stored in an `OwnedTasks` list.
///
/// Currently only blocking tasks use this method.
- pub(crate) fn unowned<T, S>(task: T, scheduler: S) -> (UnownedTask<S>, JoinHandle<T::Output>)
+ pub(crate) fn unowned<T, S>(task: T, scheduler: S, id: Id) -> (UnownedTask<S>, JoinHandle<T::Output>)
where
S: Schedule,
T: Send + Future + 'static,
T::Output: Send + 'static,
{
- let (task, notified, join) = new_task(task, scheduler);
+ let (task, notified, join) = new_task(task, scheduler, id);
// This transfers the ref-count of task and notified into an UnownedTask.
// This is valid because an UnownedTask holds two ref-counts.
@@ -296,6 +408,10 @@ impl<S: 'static> Task<S> {
fn header(&self) -> &Header {
self.raw.header()
}
+
+ fn header_ptr(&self) -> NonNull<Header> {
+ self.raw.header_ptr()
+ }
}
impl<S: 'static> Notified<S> {
@@ -313,7 +429,7 @@ cfg_rt_multi_thread! {
impl<S: 'static> Task<S> {
fn into_raw(self) -> NonNull<Header> {
- let ret = self.header().into();
+ let ret = self.raw.header_ptr();
mem::forget(self);
ret
}
@@ -327,7 +443,7 @@ cfg_rt_multi_thread! {
}
impl<S: Schedule> Task<S> {
- /// Pre-emptively cancels the task as part of the shutdown process.
+ /// Preemptively cancels the task as part of the shutdown process.
pub(crate) fn shutdown(self) {
let raw = self.raw;
mem::forget(self);
@@ -347,6 +463,7 @@ impl<S: Schedule> LocalNotified<S> {
impl<S: Schedule> UnownedTask<S> {
// Used in test of the inject queue.
#[cfg(test)]
+ #[cfg_attr(tokio_wasm, allow(dead_code))]
pub(super) fn into_notified(self) -> Notified<S> {
Notified(self.into_task())
}
@@ -426,7 +543,7 @@ unsafe impl<S> linked_list::Link for Task<S> {
type Target = Header;
fn as_raw(handle: &Task<S>) -> NonNull<Header> {
- handle.header().into()
+ handle.raw.header_ptr()
}
unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
@@ -434,7 +551,69 @@ unsafe impl<S> linked_list::Link for Task<S> {
}
unsafe fn pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>> {
- // Not super great as it avoids some of looms checking...
- NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr))
+ self::core::Trailer::addr_of_owned(Header::get_trailer(target))
+ }
+}
+
+impl fmt::Display for Id {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt(f)
+ }
+}
+
+impl Id {
+ // When 64-bit atomics are available, use a static `AtomicU64` counter to
+ // generate task IDs.
+ //
+ // Note(eliza): we _could_ just use `crate::loom::AtomicU64`, which switches
+ // between an atomic and mutex-based implementation here, rather than having
+ // two separate functions for targets with and without 64-bit atomics.
+ // However, because we can't use the mutex-based implementation in a static
+ // initializer directly, the 32-bit impl also has to use a `OnceCell`, and I
+ // thought it was nicer to avoid the `OnceCell` overhead on 64-bit
+ // platforms...
+ cfg_has_atomic_u64! {
+ pub(crate) fn next() -> Self {
+ use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
+ static NEXT_ID: AtomicU64 = AtomicU64::new(1);
+ Self(NEXT_ID.fetch_add(1, Relaxed))
+ }
+ }
+
+ cfg_not_has_atomic_u64! {
+ cfg_has_const_mutex_new! {
+ pub(crate) fn next() -> Self {
+ use crate::loom::sync::Mutex;
+ static NEXT_ID: Mutex<u64> = Mutex::const_new(1);
+
+ let mut lock = NEXT_ID.lock();
+ let id = *lock;
+ *lock += 1;
+ Self(id)
+ }
+ }
+
+ cfg_not_has_const_mutex_new! {
+ pub(crate) fn next() -> Self {
+ use crate::util::once_cell::OnceCell;
+ use crate::loom::sync::Mutex;
+
+ fn init_next_id() -> Mutex<u64> {
+ Mutex::new(1)
+ }
+
+ static NEXT_ID: OnceCell<Mutex<u64>> = OnceCell::new();
+
+ let next_id = NEXT_ID.get(init_next_id);
+ let mut lock = next_id.lock();
+ let id = *lock;
+ *lock += 1;
+ Self(id)
+ }
+ }
+ }
+
+ pub(crate) fn as_u64(&self) -> u64 {
+ self.0
}
}