diff options
Diffstat (limited to 'src/runtime/task/mod.rs')
-rw-r--r-- | src/runtime/task/mod.rs | 211 |
1 files changed, 195 insertions, 16 deletions
diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs index 1f18209..fea6e0f 100644 --- a/src/runtime/task/mod.rs +++ b/src/runtime/task/mod.rs @@ -25,7 +25,7 @@ //! //! The task uses a reference count to keep track of how many active references //! exist. The Unowned reference type takes up two ref-counts. All other -//! reference types take pu a single ref-count. +//! reference types take up a single ref-count. //! //! Besides the waker type, each task has at most one of each reference type. //! @@ -47,7 +47,8 @@ //! //! * JOIN_INTEREST - Is set to one if there exists a JoinHandle. //! -//! * JOIN_WAKER - Is set to one if the JoinHandle has set a waker. +//! * JOIN_WAKER - Acts as an access control bit for the join handle waker. The +//! protocol for its usage is described below. //! //! The rest of the bits are used for the ref-count. //! @@ -71,10 +72,38 @@ //! a lock for the stage field, and it can be accessed only by the thread //! that set RUNNING to one. //! -//! * If JOIN_WAKER is zero, then the JoinHandle has exclusive access to the -//! join handle waker. If JOIN_WAKER and COMPLETE are both one, then the -//! thread that set COMPLETE to one has exclusive access to the join handle -//! waker. +//! * The waker field may be concurrently accessed by different threads: in one +//! thread the runtime may complete a task and *read* the waker field to +//! invoke the waker, and in another thread the task's JoinHandle may be +//! polled, and if the task hasn't yet completed, the JoinHandle may *write* +//! a waker to the waker field. The JOIN_WAKER bit ensures safe access by +//! multiple threads to the waker field using the following rules: +//! +//! 1. JOIN_WAKER is initialized to zero. +//! +//! 2. If JOIN_WAKER is zero, then the JoinHandle has exclusive (mutable) +//! access to the waker field. +//! +//! 3. If JOIN_WAKER is one, then the JoinHandle has shared (read-only) +//! access to the waker field. +//! +//! 4. If JOIN_WAKER is one and COMPLETE is one, then the runtime has shared +//! (read-only) access to the waker field. +//! +//! 5. If the JoinHandle needs to write to the waker field, then the +//! JoinHandle needs to (i) successfully set JOIN_WAKER to zero if it is +//! not already zero to gain exclusive access to the waker field per rule +//! 2, (ii) write a waker, and (iii) successfully set JOIN_WAKER to one. +//! +//! 6. The JoinHandle can change JOIN_WAKER only if COMPLETE is zero (i.e. +//! the task hasn't yet completed). +//! +//! Rule 6 implies that the steps (i) or (iii) of rule 5 may fail due to a +//! race. If step (i) fails, then the attempt to write a waker is aborted. If +//! step (iii) fails because COMPLETE is set to one by another thread after +//! step (i), then the waker field is cleared. Once COMPLETE is one (i.e. +//! task has completed), the JoinHandle will not modify JOIN_WAKER. After the +//! runtime sets COMPLETE to one, it invokes the waker if there is one. //! //! All other fields are immutable and can be accessed immutably without //! synchronization by anyone. @@ -121,7 +150,7 @@ //! 1. The output is created on the thread that the future was polled on. Since //! only non-Send futures can have non-Send output, the future was polled on //! the thread that the future was spawned from. -//! 2. Since JoinHandle<Output> is not Send if Output is not Send, the +//! 2. Since `JoinHandle<Output>` is not Send if Output is not Send, the //! JoinHandle is also on the thread that the future was spawned from. //! 3. Thus, the JoinHandle will not move the output across threads when it //! takes or drops the output. @@ -135,6 +164,12 @@ //! poll call will notice it when the poll finishes, and the task is cancelled //! at that point. +// Some task infrastructure is here to support `JoinSet`, which is currently +// unstable. This should be removed once `JoinSet` is stabilized. +#![cfg_attr(not(tokio_unstable), allow(dead_code))] + +use crate::runtime::context; + mod core; use self::core::Cell; use self::core::Header; @@ -151,7 +186,14 @@ cfg_rt_multi_thread! { pub(super) use self::inject::Inject; } +#[cfg(feature = "rt")] +mod abort; mod join; + +#[cfg(feature = "rt")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::abort::AbortHandle; + #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::join::JoinHandle; @@ -173,6 +215,70 @@ use std::marker::PhantomData; use std::ptr::NonNull; use std::{fmt, mem}; +/// An opaque ID that uniquely identifies a task relative to all other currently +/// running tasks. +/// +/// # Notes +/// +/// - Task IDs are unique relative to other *currently running* tasks. When a +/// task completes, the same ID may be used for another task. +/// - Task IDs are *not* sequential, and do not indicate the order in which +/// tasks are spawned, what runtime a task is spawned on, or any other data. +/// - The task ID of the currently running task can be obtained from inside the +/// task via the [`task::try_id()`](crate::task::try_id()) and +/// [`task::id()`](crate::task::id()) functions and from outside the task via +/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [unstable]: crate#unstable-features +#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] +pub struct Id(u64); + +/// Returns the [`Id`] of the currently running task. +/// +/// # Panics +/// +/// This function panics if called from outside a task. Please note that calls +/// to `block_on` do not have task IDs, so the method will panic if called from +/// within a call to `block_on`. For a version of this function that doesn't +/// panic, see [`task::try_id()`](crate::runtime::task::try_id()). +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [task ID]: crate::task::Id +/// [unstable]: crate#unstable-features +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[track_caller] +pub fn id() -> Id { + context::current_task_id().expect("Can't get a task id when not inside a task") +} + +/// Returns the [`Id`] of the currently running task, or `None` if called outside +/// of a task. +/// +/// This function is similar to [`task::id()`](crate::runtime::task::id()), except +/// that it returns `None` rather than panicking if called outside of a task +/// context. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [task ID]: crate::task::Id +/// [unstable]: crate#unstable-features +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[track_caller] +pub fn try_id() -> Option<Id> { + context::current_task_id() +} + /// An owned handle to the task, tracked by ref count. #[repr(transparent)] pub(crate) struct Task<S: 'static> { @@ -230,6 +336,11 @@ pub(crate) trait Schedule: Sync + Sized + 'static { fn yield_now(&self, task: Notified<Self>) { self.schedule(task); } + + /// Polling the task resulted in a panic. Should the runtime shutdown? + fn unhandled_panic(&self) { + // By default, do nothing. This maintains the 1.0 behavior. + } } cfg_rt! { @@ -239,14 +350,15 @@ cfg_rt! { /// notification. fn new_task<T, S>( task: T, - scheduler: S + scheduler: S, + id: Id, ) -> (Task<S>, Notified<S>, JoinHandle<T::Output>) where S: Schedule, T: Future + 'static, T::Output: 'static, { - let raw = RawTask::new::<T, S>(task, scheduler); + let raw = RawTask::new::<T, S>(task, scheduler, id); let task = Task { raw, _p: PhantomData, @@ -264,13 +376,13 @@ cfg_rt! { /// only when the task is not going to be stored in an `OwnedTasks` list. /// /// Currently only blocking tasks use this method. - pub(crate) fn unowned<T, S>(task: T, scheduler: S) -> (UnownedTask<S>, JoinHandle<T::Output>) + pub(crate) fn unowned<T, S>(task: T, scheduler: S, id: Id) -> (UnownedTask<S>, JoinHandle<T::Output>) where S: Schedule, T: Send + Future + 'static, T::Output: Send + 'static, { - let (task, notified, join) = new_task(task, scheduler); + let (task, notified, join) = new_task(task, scheduler, id); // This transfers the ref-count of task and notified into an UnownedTask. // This is valid because an UnownedTask holds two ref-counts. @@ -296,6 +408,10 @@ impl<S: 'static> Task<S> { fn header(&self) -> &Header { self.raw.header() } + + fn header_ptr(&self) -> NonNull<Header> { + self.raw.header_ptr() + } } impl<S: 'static> Notified<S> { @@ -313,7 +429,7 @@ cfg_rt_multi_thread! { impl<S: 'static> Task<S> { fn into_raw(self) -> NonNull<Header> { - let ret = self.header().into(); + let ret = self.raw.header_ptr(); mem::forget(self); ret } @@ -327,7 +443,7 @@ cfg_rt_multi_thread! { } impl<S: Schedule> Task<S> { - /// Pre-emptively cancels the task as part of the shutdown process. + /// Preemptively cancels the task as part of the shutdown process. pub(crate) fn shutdown(self) { let raw = self.raw; mem::forget(self); @@ -347,6 +463,7 @@ impl<S: Schedule> LocalNotified<S> { impl<S: Schedule> UnownedTask<S> { // Used in test of the inject queue. #[cfg(test)] + #[cfg_attr(tokio_wasm, allow(dead_code))] pub(super) fn into_notified(self) -> Notified<S> { Notified(self.into_task()) } @@ -426,7 +543,7 @@ unsafe impl<S> linked_list::Link for Task<S> { type Target = Header; fn as_raw(handle: &Task<S>) -> NonNull<Header> { - handle.header().into() + handle.raw.header_ptr() } unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> { @@ -434,7 +551,69 @@ unsafe impl<S> linked_list::Link for Task<S> { } unsafe fn pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>> { - // Not super great as it avoids some of looms checking... - NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr)) + self::core::Trailer::addr_of_owned(Header::get_trailer(target)) + } +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl Id { + // When 64-bit atomics are available, use a static `AtomicU64` counter to + // generate task IDs. + // + // Note(eliza): we _could_ just use `crate::loom::AtomicU64`, which switches + // between an atomic and mutex-based implementation here, rather than having + // two separate functions for targets with and without 64-bit atomics. + // However, because we can't use the mutex-based implementation in a static + // initializer directly, the 32-bit impl also has to use a `OnceCell`, and I + // thought it was nicer to avoid the `OnceCell` overhead on 64-bit + // platforms... + cfg_has_atomic_u64! { + pub(crate) fn next() -> Self { + use std::sync::atomic::{AtomicU64, Ordering::Relaxed}; + static NEXT_ID: AtomicU64 = AtomicU64::new(1); + Self(NEXT_ID.fetch_add(1, Relaxed)) + } + } + + cfg_not_has_atomic_u64! { + cfg_has_const_mutex_new! { + pub(crate) fn next() -> Self { + use crate::loom::sync::Mutex; + static NEXT_ID: Mutex<u64> = Mutex::const_new(1); + + let mut lock = NEXT_ID.lock(); + let id = *lock; + *lock += 1; + Self(id) + } + } + + cfg_not_has_const_mutex_new! { + pub(crate) fn next() -> Self { + use crate::util::once_cell::OnceCell; + use crate::loom::sync::Mutex; + + fn init_next_id() -> Mutex<u64> { + Mutex::new(1) + } + + static NEXT_ID: OnceCell<Mutex<u64>> = OnceCell::new(); + + let next_id = NEXT_ID.get(init_next_id); + let mut lock = next_id.lock(); + let id = *lock; + *lock += 1; + Self(id) + } + } + } + + pub(crate) fn as_u64(&self) -> u64 { + self.0 } } |