aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/task
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2020-10-05 08:16:15 -0700
committerJoel Galenson <jgalenson@google.com>2020-10-05 08:16:15 -0700
commitf03b3ba785a6d336884bfc525046906f8c2a9904 (patch)
tree14e2bd707d8d152ea0476ec9e686deb2a2f55b34 /src/runtime/task
parent40b8b369b069afb314a9d4bb92be1bdd038979f8 (diff)
downloadtokio-f03b3ba785a6d336884bfc525046906f8c2a9904.tar.gz
Import tokio-0.2.22
Test: None Change-Id: Iea7ee5e62819c9b16dbfad05a6146775df72506a
Diffstat (limited to 'src/runtime/task')
-rw-r--r--src/runtime/task/core.rs289
-rw-r--r--src/runtime/task/error.rs163
-rw-r--r--src/runtime/task/harness.rs371
-rw-r--r--src/runtime/task/join.rs156
-rw-r--r--src/runtime/task/mod.rs220
-rw-r--r--src/runtime/task/raw.rs131
-rw-r--r--src/runtime/task/stack.rs83
-rw-r--r--src/runtime/task/state.rs446
-rw-r--r--src/runtime/task/waker.rs101
9 files changed, 1960 insertions, 0 deletions
diff --git a/src/runtime/task/core.rs b/src/runtime/task/core.rs
new file mode 100644
index 0000000..f4756c2
--- /dev/null
+++ b/src/runtime/task/core.rs
@@ -0,0 +1,289 @@
+//! Core task module.
+//!
+//! # Safety
+//!
+//! The functions in this module are private to the `task` module. All of them
+//! should be considered `unsafe` to use, but are not marked as such since it
+//! would be too noisy.
+//!
+//! Make sure to consult the relevant safety section of each function before
+//! use.
+
+use crate::loom::cell::UnsafeCell;
+use crate::runtime::task::raw::{self, Vtable};
+use crate::runtime::task::state::State;
+use crate::runtime::task::waker::waker_ref;
+use crate::runtime::task::{Notified, Schedule, Task};
+use crate::util::linked_list;
+
+use std::future::Future;
+use std::pin::Pin;
+use std::ptr::NonNull;
+use std::task::{Context, Poll, Waker};
+
+/// The task cell. Contains the components of the task.
+///
+/// It is critical for `Header` to be the first field as the task structure will
+/// be referenced by both *mut Cell and *mut Header.
+#[repr(C)]
+pub(super) struct Cell<T: Future, S> {
+ /// Hot task state data
+ pub(super) header: Header,
+
+ /// Either the future or output, depending on the execution stage.
+ pub(super) core: Core<T, S>,
+
+ /// Cold data
+ pub(super) trailer: Trailer,
+}
+
+/// The core of the task.
+///
+/// Holds the future or output, depending on the stage of execution.
+pub(super) struct Core<T: Future, S> {
+ /// Scheduler used to drive this future
+ pub(super) scheduler: UnsafeCell<Option<S>>,
+
+ /// Either the future or the output
+ pub(super) stage: UnsafeCell<Stage<T>>,
+}
+
+/// Crate public as this is also needed by the pool.
+#[repr(C)]
+pub(crate) struct Header {
+ /// Task state
+ pub(super) state: State,
+
+ pub(crate) owned: UnsafeCell<linked_list::Pointers<Header>>,
+
+ /// Pointer to next task, used with the injection queue
+ pub(crate) queue_next: UnsafeCell<Option<NonNull<Header>>>,
+
+ /// Pointer to the next task in the transfer stack
+ pub(super) stack_next: UnsafeCell<Option<NonNull<Header>>>,
+
+ /// Table of function pointers for executing actions on the task.
+ pub(super) vtable: &'static Vtable,
+}
+
+unsafe impl Send for Header {}
+unsafe impl Sync for Header {}
+
+/// Cold data is stored after the future.
+pub(super) struct Trailer {
+ /// Consumer task waiting on completion of this task.
+ pub(super) waker: UnsafeCell<Option<Waker>>,
+}
+
+/// Either the future or the output.
+pub(super) enum Stage<T: Future> {
+ Running(T),
+ Finished(super::Result<T::Output>),
+ Consumed,
+}
+
+impl<T: Future, S: Schedule> Cell<T, S> {
+ /// Allocates a new task cell, containing the header, trailer, and core
+ /// structures.
+ pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
+ Box::new(Cell {
+ header: Header {
+ state,
+ owned: UnsafeCell::new(linked_list::Pointers::new()),
+ queue_next: UnsafeCell::new(None),
+ stack_next: UnsafeCell::new(None),
+ vtable: raw::vtable::<T, S>(),
+ },
+ core: Core {
+ scheduler: UnsafeCell::new(None),
+ stage: UnsafeCell::new(Stage::Running(future)),
+ },
+ trailer: Trailer {
+ waker: UnsafeCell::new(None),
+ },
+ })
+ }
+}
+
+impl<T: Future, S: Schedule> Core<T, S> {
+ /// Bind a scheduler to the task.
+ ///
+ /// This only happens on the first poll and must be preceeded by a call to
+ /// `is_bound` to determine if binding is appropriate or not.
+ ///
+ /// # Safety
+ ///
+ /// Binding must not be done concurrently since it will mutate the task
+ /// core through a shared reference.
+ pub(super) fn bind_scheduler(&self, task: Task<S>) {
+ // This function may be called concurrently, but the __first__ time it
+ // is called, the caller has unique access to this field. All subsequent
+ // concurrent calls will be via the `Waker`, which will "happens after"
+ // the first poll.
+ //
+ // In other words, it is always safe to read the field and it is safe to
+ // write to the field when it is `None`.
+ debug_assert!(!self.is_bound());
+
+ // Bind the task to the scheduler
+ let scheduler = S::bind(task);
+
+ // Safety: As `scheduler` is not set, this is the first poll
+ self.scheduler.with_mut(|ptr| unsafe {
+ *ptr = Some(scheduler);
+ });
+ }
+
+ /// Returns true if the task is bound to a scheduler.
+ pub(super) fn is_bound(&self) -> bool {
+ // Safety: never called concurrently w/ a mutation.
+ self.scheduler.with(|ptr| unsafe { (*ptr).is_some() })
+ }
+
+ /// Poll the future
+ ///
+ /// # Safety
+ ///
+ /// The caller must ensure it is safe to mutate the `state` field. This
+ /// requires ensuring mutal exclusion between any concurrent thread that
+ /// might modify the future or output field.
+ ///
+ /// The mutual exclusion is implemented by `Harness` and the `Lifecycle`
+ /// component of the task state.
+ ///
+ /// `self` must also be pinned. This is handled by storing the task on the
+ /// heap.
+ pub(super) fn poll(&self, header: &Header) -> Poll<T::Output> {
+ let res = {
+ self.stage.with_mut(|ptr| {
+ // Safety: The caller ensures mutual exclusion to the field.
+ let future = match unsafe { &mut *ptr } {
+ Stage::Running(future) => future,
+ _ => unreachable!("unexpected stage"),
+ };
+
+ // Safety: The caller ensures the future is pinned.
+ let future = unsafe { Pin::new_unchecked(future) };
+
+ // The waker passed into the `poll` function does not require a ref
+ // count increment.
+ let waker_ref = waker_ref::<T, S>(header);
+ let mut cx = Context::from_waker(&*waker_ref);
+
+ future.poll(&mut cx)
+ })
+ };
+
+ if res.is_ready() {
+ self.drop_future_or_output();
+ }
+
+ res
+ }
+
+ /// Drop the future
+ ///
+ /// # Safety
+ ///
+ /// The caller must ensure it is safe to mutate the `stage` field.
+ pub(super) fn drop_future_or_output(&self) {
+ self.stage.with_mut(|ptr| {
+ // Safety: The caller ensures mutal exclusion to the field.
+ unsafe { *ptr = Stage::Consumed };
+ });
+ }
+
+ /// Store the task output
+ ///
+ /// # Safety
+ ///
+ /// The caller must ensure it is safe to mutate the `stage` field.
+ pub(super) fn store_output(&self, output: super::Result<T::Output>) {
+ self.stage.with_mut(|ptr| {
+ // Safety: the caller ensures mutual exclusion to the field.
+ unsafe { *ptr = Stage::Finished(output) };
+ });
+ }
+
+ /// Take the task output
+ ///
+ /// # Safety
+ ///
+ /// The caller must ensure it is safe to mutate the `stage` field.
+ pub(super) fn take_output(&self) -> super::Result<T::Output> {
+ use std::mem;
+
+ self.stage.with_mut(|ptr| {
+ // Safety:: the caller ensures mutal exclusion to the field.
+ match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
+ Stage::Finished(output) => output,
+ _ => panic!("unexpected task state"),
+ }
+ })
+ }
+
+ /// Schedule the future for execution
+ pub(super) fn schedule(&self, task: Notified<S>) {
+ self.scheduler.with(|ptr| {
+ // Safety: Can only be called after initial `poll`, which is the
+ // only time the field is mutated.
+ match unsafe { &*ptr } {
+ Some(scheduler) => scheduler.schedule(task),
+ None => panic!("no scheduler set"),
+ }
+ });
+ }
+
+ /// Schedule the future for execution in the near future, yielding the
+ /// thread to other tasks.
+ pub(super) fn yield_now(&self, task: Notified<S>) {
+ self.scheduler.with(|ptr| {
+ // Safety: Can only be called after initial `poll`, which is the
+ // only time the field is mutated.
+ match unsafe { &*ptr } {
+ Some(scheduler) => scheduler.yield_now(task),
+ None => panic!("no scheduler set"),
+ }
+ });
+ }
+
+ /// Release the task
+ ///
+ /// If the `Scheduler` implementation is able to, it returns the `Task`
+ /// handle immediately. The caller of this function will batch a ref-dec
+ /// with a state change.
+ pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> {
+ use std::mem::ManuallyDrop;
+
+ let task = ManuallyDrop::new(task);
+
+ self.scheduler.with(|ptr| {
+ // Safety: Can only be called after initial `poll`, which is the
+ // only time the field is mutated.
+ match unsafe { &*ptr } {
+ Some(scheduler) => scheduler.release(&*task),
+ // Task was never polled
+ None => None,
+ }
+ })
+ }
+}
+
+cfg_rt_threaded! {
+ impl Header {
+ pub(crate) fn shutdown(&self) {
+ use crate::runtime::task::RawTask;
+
+ let task = unsafe { RawTask::from_raw(self.into()) };
+ task.shutdown();
+ }
+ }
+}
+
+#[test]
+#[cfg(not(loom))]
+fn header_lte_cache_line() {
+ use std::mem::size_of;
+
+ assert!(size_of::<Header>() <= 8 * size_of::<*const ()>());
+}
diff --git a/src/runtime/task/error.rs b/src/runtime/task/error.rs
new file mode 100644
index 0000000..d5f65a4
--- /dev/null
+++ b/src/runtime/task/error.rs
@@ -0,0 +1,163 @@
+use std::any::Any;
+use std::fmt;
+use std::io;
+use std::sync::Mutex;
+
+doc_rt_core! {
+ /// Task failed to execute to completion.
+ pub struct JoinError {
+ repr: Repr,
+ }
+}
+
+enum Repr {
+ Cancelled,
+ Panic(Mutex<Box<dyn Any + Send + 'static>>),
+}
+
+impl JoinError {
+ #[doc(hidden)]
+ #[deprecated]
+ pub fn cancelled() -> JoinError {
+ Self::cancelled2()
+ }
+
+ pub(crate) fn cancelled2() -> JoinError {
+ JoinError {
+ repr: Repr::Cancelled,
+ }
+ }
+
+ #[doc(hidden)]
+ #[deprecated]
+ pub fn panic(err: Box<dyn Any + Send + 'static>) -> JoinError {
+ Self::panic2(err)
+ }
+
+ pub(crate) fn panic2(err: Box<dyn Any + Send + 'static>) -> JoinError {
+ JoinError {
+ repr: Repr::Panic(Mutex::new(err)),
+ }
+ }
+
+ /// Returns true if the error was caused by the task being cancelled
+ pub fn is_cancelled(&self) -> bool {
+ match &self.repr {
+ Repr::Cancelled => true,
+ _ => false,
+ }
+ }
+
+ /// Returns true if the error was caused by the task panicking
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::panic;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let err = tokio::spawn(async {
+ /// panic!("boom");
+ /// }).await.unwrap_err();
+ ///
+ /// assert!(err.is_panic());
+ /// }
+ /// ```
+ pub fn is_panic(&self) -> bool {
+ match &self.repr {
+ Repr::Panic(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Consumes the join error, returning the object with which the task panicked.
+ ///
+ /// # Panics
+ ///
+ /// `into_panic()` panics if the `Error` does not represent the underlying
+ /// task terminating with a panic. Use `is_panic` to check the error reason
+ /// or `try_into_panic` for a variant that does not panic.
+ ///
+ /// # Examples
+ ///
+ /// ```should_panic
+ /// use std::panic;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let err = tokio::spawn(async {
+ /// panic!("boom");
+ /// }).await.unwrap_err();
+ ///
+ /// if err.is_panic() {
+ /// // Resume the panic on the main task
+ /// panic::resume_unwind(err.into_panic());
+ /// }
+ /// }
+ /// ```
+ pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
+ self.try_into_panic()
+ .expect("`JoinError` reason is not a panic.")
+ }
+
+ /// Consumes the join error, returning the object with which the task
+ /// panicked if the task terminated due to a panic. Otherwise, `self` is
+ /// returned.
+ ///
+ /// # Examples
+ ///
+ /// ```should_panic
+ /// use std::panic;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let err = tokio::spawn(async {
+ /// panic!("boom");
+ /// }).await.unwrap_err();
+ ///
+ /// if let Ok(reason) = err.try_into_panic() {
+ /// // Resume the panic on the main task
+ /// panic::resume_unwind(reason);
+ /// }
+ /// }
+ /// ```
+ pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, JoinError> {
+ match self.repr {
+ Repr::Panic(p) => Ok(p.into_inner().expect("Extracting panic from mutex")),
+ _ => Err(self),
+ }
+ }
+}
+
+impl fmt::Display for JoinError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match &self.repr {
+ Repr::Cancelled => write!(fmt, "cancelled"),
+ Repr::Panic(_) => write!(fmt, "panic"),
+ }
+ }
+}
+
+impl fmt::Debug for JoinError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match &self.repr {
+ Repr::Cancelled => write!(fmt, "JoinError::Cancelled"),
+ Repr::Panic(_) => write!(fmt, "JoinError::Panic(...)"),
+ }
+ }
+}
+
+impl std::error::Error for JoinError {}
+
+impl From<JoinError> for io::Error {
+ fn from(src: JoinError) -> io::Error {
+ io::Error::new(
+ io::ErrorKind::Other,
+ match src.repr {
+ Repr::Cancelled => "task was cancelled",
+ Repr::Panic(_) => "task panicked",
+ },
+ )
+ }
+}
diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs
new file mode 100644
index 0000000..e86b29e
--- /dev/null
+++ b/src/runtime/task/harness.rs
@@ -0,0 +1,371 @@
+use crate::runtime::task::core::{Cell, Core, Header, Trailer};
+use crate::runtime::task::state::Snapshot;
+use crate::runtime::task::{JoinError, Notified, Schedule, Task};
+
+use std::future::Future;
+use std::mem;
+use std::panic;
+use std::ptr::NonNull;
+use std::task::{Poll, Waker};
+
+/// Typed raw task handle
+pub(super) struct Harness<T: Future, S: 'static> {
+ cell: NonNull<Cell<T, S>>,
+}
+
+impl<T, S> Harness<T, S>
+where
+ T: Future,
+ S: 'static,
+{
+ pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
+ Harness {
+ cell: ptr.cast::<Cell<T, S>>(),
+ }
+ }
+
+ fn header(&self) -> &Header {
+ unsafe { &self.cell.as_ref().header }
+ }
+
+ fn trailer(&self) -> &Trailer {
+ unsafe { &self.cell.as_ref().trailer }
+ }
+
+ fn core(&self) -> &Core<T, S> {
+ unsafe { &self.cell.as_ref().core }
+ }
+}
+
+impl<T, S> Harness<T, S>
+where
+ T: Future,
+ S: Schedule,
+{
+ /// Polls the inner future.
+ ///
+ /// All necessary state checks and transitions are performed.
+ ///
+ /// Panics raised while polling the future are handled.
+ pub(super) fn poll(self) {
+ // If this is the first time the task is polled, the task will be bound
+ // to the scheduler, in which case the task ref count must be
+ // incremented.
+ let is_not_bound = !self.core().is_bound();
+
+ // Transition the task to the running state.
+ //
+ // A failure to transition here indicates the task has been cancelled
+ // while in the run queue pending execution.
+ let snapshot = match self.header().state.transition_to_running(is_not_bound) {
+ Ok(snapshot) => snapshot,
+ Err(_) => {
+ // The task was shutdown while in the run queue. At this point,
+ // we just hold a ref counted reference. Drop it here.
+ self.drop_reference();
+ return;
+ }
+ };
+
+ if is_not_bound {
+ // Ensure the task is bound to a scheduler instance. Since this is
+ // the first time polling the task, a scheduler instance is pulled
+ // from the local context and assigned to the task.
+ //
+ // The scheduler maintains ownership of the task and responds to
+ // `wake` calls.
+ //
+ // The task reference count has been incremented.
+ //
+ // Safety: Since we have unique access to the task so that we can
+ // safely call `bind_scheduler`.
+ self.core().bind_scheduler(self.to_task());
+ }
+
+ // The transition to `Running` done above ensures that a lock on the
+ // future has been obtained. This also ensures the `*mut T` pointer
+ // contains the future (as opposed to the output) and is initialized.
+
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ struct Guard<'a, T: Future, S: Schedule> {
+ core: &'a Core<T, S>,
+ }
+
+ impl<T: Future, S: Schedule> Drop for Guard<'_, T, S> {
+ fn drop(&mut self) {
+ self.core.drop_future_or_output();
+ }
+ }
+
+ let guard = Guard { core: self.core() };
+
+ // If the task is cancelled, avoid polling it, instead signalling it
+ // is complete.
+ if snapshot.is_cancelled() {
+ Poll::Ready(Err(JoinError::cancelled2()))
+ } else {
+ let res = guard.core.poll(self.header());
+
+ // prevent the guard from dropping the future
+ mem::forget(guard);
+
+ res.map(Ok)
+ }
+ }));
+
+ match res {
+ Ok(Poll::Ready(out)) => {
+ self.complete(out, snapshot.is_join_interested());
+ }
+ Ok(Poll::Pending) => {
+ match self.header().state.transition_to_idle() {
+ Ok(snapshot) => {
+ if snapshot.is_notified() {
+ // Signal yield
+ self.core().yield_now(Notified(self.to_task()));
+ // The ref-count was incremented as part of
+ // `transition_to_idle`.
+ self.drop_reference();
+ }
+ }
+ Err(_) => self.cancel_task(),
+ }
+ }
+ Err(err) => {
+ self.complete(Err(JoinError::panic2(err)), snapshot.is_join_interested());
+ }
+ }
+ }
+
+ pub(super) fn dealloc(self) {
+ // Release the join waker, if there is one.
+ self.trailer().waker.with_mut(|_| ());
+
+ // Check causality
+ self.core().stage.with_mut(|_| {});
+ self.core().scheduler.with_mut(|_| {});
+
+ unsafe {
+ drop(Box::from_raw(self.cell.as_ptr()));
+ }
+ }
+
+ // ===== join handle =====
+
+ /// Read the task output into `dst`.
+ pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
+ // Load a snapshot of the current task state
+ let snapshot = self.header().state.load();
+
+ debug_assert!(snapshot.is_join_interested());
+
+ if !snapshot.is_complete() {
+ // The waker must be stored in the task struct.
+ let res = if snapshot.has_join_waker() {
+ // There already is a waker stored in the struct. If it matches
+ // the provided waker, then there is no further work to do.
+ // Otherwise, the waker must be swapped.
+ let will_wake = unsafe {
+ // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE`
+ // may mutate the `waker` field.
+ self.trailer()
+ .waker
+ .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker))
+ };
+
+ if will_wake {
+ // The task is not complete **and** the waker is up to date,
+ // there is nothing further that needs to be done.
+ return;
+ }
+
+ // Unset the `JOIN_WAKER` to gain mutable access to the `waker`
+ // field then update the field with the new join worker.
+ //
+ // This requires two atomic operations, unsetting the bit and
+ // then resetting it. If the task transitions to complete
+ // concurrently to either one of those operations, then setting
+ // the join waker fails and we proceed to reading the task
+ // output.
+ self.header()
+ .state
+ .unset_waker()
+ .and_then(|snapshot| self.set_join_waker(waker.clone(), snapshot))
+ } else {
+ self.set_join_waker(waker.clone(), snapshot)
+ };
+
+ match res {
+ Ok(_) => return,
+ Err(snapshot) => {
+ assert!(snapshot.is_complete());
+ }
+ }
+ }
+
+ *dst = Poll::Ready(self.core().take_output());
+ }
+
+ fn set_join_waker(&self, waker: Waker, snapshot: Snapshot) -> Result<Snapshot, Snapshot> {
+ assert!(snapshot.is_join_interested());
+ assert!(!snapshot.has_join_waker());
+
+ // Safety: Only the `JoinHandle` may set the `waker` field. When
+ // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
+ unsafe {
+ self.trailer().waker.with_mut(|ptr| {
+ *ptr = Some(waker);
+ });
+ }
+
+ // Update the `JoinWaker` state accordingly
+ let res = self.header().state.set_join_waker();
+
+ // If the state could not be updated, then clear the join waker
+ if res.is_err() {
+ unsafe {
+ self.trailer().waker.with_mut(|ptr| {
+ *ptr = None;
+ });
+ }
+ }
+
+ res
+ }
+
+ pub(super) fn drop_join_handle_slow(self) {
+ // Try to unset `JOIN_INTEREST`. This must be done as a first step in
+ // case the task concurrently completed.
+ if self.header().state.unset_join_interested().is_err() {
+ // It is our responsibility to drop the output. This is critical as
+ // the task output may not be `Send` and as such must remain with
+ // the scheduler or `JoinHandle`. i.e. if the output remains in the
+ // task structure until the task is deallocated, it may be dropped
+ // by a Waker on any arbitrary thread.
+ self.core().drop_future_or_output();
+ }
+
+ // Drop the `JoinHandle` reference, possibly deallocating the task
+ self.drop_reference();
+ }
+
+ // ===== waker behavior =====
+
+ pub(super) fn wake_by_val(self) {
+ self.wake_by_ref();
+ self.drop_reference();
+ }
+
+ pub(super) fn wake_by_ref(&self) {
+ if self.header().state.transition_to_notified() {
+ self.core().schedule(Notified(self.to_task()));
+ }
+ }
+
+ pub(super) fn drop_reference(self) {
+ if self.header().state.ref_dec() {
+ self.dealloc();
+ }
+ }
+
+ /// Forcibly shutdown the task
+ ///
+ /// Attempt to transition to `Running` in order to forcibly shutdown the
+ /// task. If the task is currently running or in a state of completion, then
+ /// there is nothing further to do. When the task completes running, it will
+ /// notice the `CANCELLED` bit and finalize the task.
+ pub(super) fn shutdown(self) {
+ if !self.header().state.transition_to_shutdown() {
+ // The task is concurrently running. No further work needed.
+ return;
+ }
+
+ // By transitioning the lifcycle to `Running`, we have permission to
+ // drop the future.
+ self.cancel_task();
+ }
+
+ // ====== internal ======
+
+ fn cancel_task(self) {
+ // Drop the future from a panic guard.
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ self.core().drop_future_or_output();
+ }));
+
+ if let Err(err) = res {
+ // Dropping the future panicked, complete the join
+ // handle with the panic to avoid dropping the panic
+ // on the ground.
+ self.complete(Err(JoinError::panic2(err)), true);
+ } else {
+ self.complete(Err(JoinError::cancelled2()), true);
+ }
+ }
+
+ fn complete(mut self, output: super::Result<T::Output>, is_join_interested: bool) {
+ if is_join_interested {
+ // Store the output. The future has already been dropped
+ //
+ // Safety: Mutual exclusion is obtained by having transitioned the task
+ // state -> Running
+ self.core().store_output(output);
+
+ // Transition to `Complete`, notifying the `JoinHandle` if necessary.
+ self.transition_to_complete();
+ }
+
+ // The task has completed execution and will no longer be scheduled.
+ //
+ // Attempts to batch a ref-dec with the state transition below.
+ let ref_dec = if self.core().is_bound() {
+ if let Some(task) = self.core().release(self.to_task()) {
+ mem::forget(task);
+ true
+ } else {
+ false
+ }
+ } else {
+ false
+ };
+
+ // This might deallocate
+ let snapshot = self
+ .header()
+ .state
+ .transition_to_terminal(!is_join_interested, ref_dec);
+
+ if snapshot.ref_count() == 0 {
+ self.dealloc()
+ }
+ }
+
+ /// Transitions the task's lifecycle to `Complete`. Notifies the
+ /// `JoinHandle` if it still has interest in the completion.
+ fn transition_to_complete(&mut self) {
+ // Transition the task's lifecycle to `Complete` and get a snapshot of
+ // the task's sate.
+ let snapshot = self.header().state.transition_to_complete();
+
+ if !snapshot.is_join_interested() {
+ // The `JoinHandle` is not interested in the output of this task. It
+ // is our responsibility to drop the output.
+ self.core().drop_future_or_output();
+ } else if snapshot.has_join_waker() {
+ // Notify the join handle. The previous transition obtains the
+ // lock on the waker cell.
+ self.wake_join();
+ }
+ }
+
+ fn wake_join(&self) {
+ self.trailer().waker.with(|ptr| match unsafe { &*ptr } {
+ Some(waker) => waker.wake_by_ref(),
+ None => panic!("waker missing"),
+ });
+ }
+
+ fn to_task(&self) -> Task<S> {
+ unsafe { Task::from_raw(self.header().into()) }
+ }
+}
diff --git a/src/runtime/task/join.rs b/src/runtime/task/join.rs
new file mode 100644
index 0000000..3c4aabb
--- /dev/null
+++ b/src/runtime/task/join.rs
@@ -0,0 +1,156 @@
+use crate::runtime::task::RawTask;
+
+use std::fmt;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+doc_rt_core! {
+ /// An owned permission to join on a task (await its termination).
+ ///
+ /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for
+ /// a task rather than a thread.
+ ///
+ /// A `JoinHandle` *detaches* the associated task when it is dropped, which
+ /// means that there is no longer any handle to the task, and no way to `join`
+ /// on it.
+ ///
+ /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
+ /// functions.
+ ///
+ /// # Examples
+ ///
+ /// Creation from [`task::spawn`]:
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn doc() {
+ /// let join_handle: task::JoinHandle<_> = task::spawn(async {
+ /// // some work here
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// Creation from [`task::spawn_blocking`]:
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn doc() {
+ /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
+ /// // some blocking work here
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// Child being detached and outliving its parent:
+ ///
+ /// ```no_run
+ /// use tokio::task;
+ /// use tokio::time;
+ /// use std::time::Duration;
+ ///
+ /// # #[tokio::main] async fn main() {
+ /// let original_task = task::spawn(async {
+ /// let _detached_task = task::spawn(async {
+ /// // Here we sleep to make sure that the first task returns before.
+ /// time::delay_for(Duration::from_millis(10)).await;
+ /// // This will be called, even though the JoinHandle is dropped.
+ /// println!("♫ Still alive ♫");
+ /// });
+ /// });
+ ///
+ /// original_task.await.expect("The task being joined has panicked");
+ /// println!("Original task is joined.");
+ ///
+ /// // We make sure that the new task has time to run, before the main
+ /// // task returns.
+ ///
+ /// time::delay_for(Duration::from_millis(1000)).await;
+ /// # }
+ /// ```
+ ///
+ /// [`task::spawn`]: crate::task::spawn()
+ /// [`task::spawn_blocking`]: crate::task::spawn_blocking
+ /// [`std::thread::JoinHandle`]: std::thread::JoinHandle
+ pub struct JoinHandle<T> {
+ raw: Option<RawTask>,
+ _p: PhantomData<T>,
+ }
+}
+
+unsafe impl<T: Send> Send for JoinHandle<T> {}
+unsafe impl<T: Send> Sync for JoinHandle<T> {}
+
+impl<T> JoinHandle<T> {
+ pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
+ JoinHandle {
+ raw: Some(raw),
+ _p: PhantomData,
+ }
+ }
+}
+
+impl<T> Unpin for JoinHandle<T> {}
+
+impl<T> Future for JoinHandle<T> {
+ type Output = super::Result<T>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut ret = Poll::Pending;
+
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+
+ // Raw should always be set. If it is not, this is due to polling after
+ // completion
+ let raw = self
+ .raw
+ .as_ref()
+ .expect("polling after `JoinHandle` already completed");
+
+ // Try to read the task output. If the task is not yet complete, the
+ // waker is stored and is notified once the task does complete.
+ //
+ // The function must go via the vtable, which requires erasing generic
+ // types. To do this, the function "return" is placed on the stack
+ // **before** calling the function and is passed into the function using
+ // `*mut ()`.
+ //
+ // Safety:
+ //
+ // The type of `T` must match the task's output type.
+ unsafe {
+ raw.try_read_output(&mut ret as *mut _ as *mut (), cx.waker());
+ }
+
+ if ret.is_ready() {
+ coop.made_progress();
+ }
+
+ ret
+ }
+}
+
+impl<T> Drop for JoinHandle<T> {
+ fn drop(&mut self) {
+ if let Some(raw) = self.raw.take() {
+ if raw.header().state.drop_join_handle_fast().is_ok() {
+ return;
+ }
+
+ raw.drop_join_handle_slow();
+ }
+ }
+}
+
+impl<T> fmt::Debug for JoinHandle<T>
+where
+ T: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("JoinHandle").finish()
+ }
+}
diff --git a/src/runtime/task/mod.rs b/src/runtime/task/mod.rs
new file mode 100644
index 0000000..17b5157
--- /dev/null
+++ b/src/runtime/task/mod.rs
@@ -0,0 +1,220 @@
+mod core;
+use self::core::Cell;
+pub(crate) use self::core::Header;
+
+mod error;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::error::JoinError;
+
+mod harness;
+use self::harness::Harness;
+
+mod join;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::join::JoinHandle;
+
+mod raw;
+use self::raw::RawTask;
+
+mod state;
+use self::state::State;
+
+mod waker;
+
+cfg_rt_threaded! {
+ mod stack;
+ pub(crate) use self::stack::TransferStack;
+}
+
+use crate::util::linked_list;
+
+use std::future::Future;
+use std::marker::PhantomData;
+use std::ptr::NonNull;
+use std::{fmt, mem};
+
+/// An owned handle to the task, tracked by ref count
+#[repr(transparent)]
+pub(crate) struct Task<S: 'static> {
+ raw: RawTask,
+ _p: PhantomData<S>,
+}
+
+unsafe impl<S> Send for Task<S> {}
+unsafe impl<S> Sync for Task<S> {}
+
+/// A task was notified
+#[repr(transparent)]
+pub(crate) struct Notified<S: 'static>(Task<S>);
+
+unsafe impl<S: Schedule> Send for Notified<S> {}
+unsafe impl<S: Schedule> Sync for Notified<S> {}
+
+/// Task result sent back
+pub(crate) type Result<T> = std::result::Result<T, JoinError>;
+
+pub(crate) trait Schedule: Sync + Sized + 'static {
+ /// Bind a task to the executor.
+ ///
+ /// Guaranteed to be called from the thread that called `poll` on the task.
+ /// The returned `Schedule` instance is associated with the task and is used
+ /// as `&self` in the other methods on this trait.
+ fn bind(task: Task<Self>) -> Self;
+
+ /// The task has completed work and is ready to be released. The scheduler
+ /// is free to drop it whenever.
+ ///
+ /// If the scheduler can immediately release the task, it should return
+ /// it as part of the function. This enables the task module to batch
+ /// the ref-dec with other options.
+ fn release(&self, task: &Task<Self>) -> Option<Task<Self>>;
+
+ /// Schedule the task
+ fn schedule(&self, task: Notified<Self>);
+
+ /// Schedule the task to run in the near future, yielding the thread to
+ /// other tasks.
+ fn yield_now(&self, task: Notified<Self>) {
+ self.schedule(task);
+ }
+}
+
+/// Create a new task with an associated join handle
+pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
+where
+ T: Future + Send + 'static,
+ S: Schedule,
+{
+ let raw = RawTask::new::<_, S>(task);
+
+ let task = Task {
+ raw,
+ _p: PhantomData,
+ };
+
+ let join = JoinHandle::new(raw);
+
+ (Notified(task), join)
+}
+
+cfg_rt_util! {
+ /// Create a new `!Send` task with an associated join handle
+ pub(crate) unsafe fn joinable_local<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
+ where
+ T: Future + 'static,
+ S: Schedule,
+ {
+ let raw = RawTask::new::<_, S>(task);
+
+ let task = Task {
+ raw,
+ _p: PhantomData,
+ };
+
+ let join = JoinHandle::new(raw);
+
+ (Notified(task), join)
+ }
+}
+
+impl<S: 'static> Task<S> {
+ pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
+ Task {
+ raw: RawTask::from_raw(ptr),
+ _p: PhantomData,
+ }
+ }
+
+ pub(crate) fn header(&self) -> &Header {
+ self.raw.header()
+ }
+}
+
+cfg_rt_threaded! {
+ impl<S: 'static> Notified<S> {
+ pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Notified<S> {
+ Notified(Task::from_raw(ptr))
+ }
+
+ pub(crate) fn header(&self) -> &Header {
+ self.0.header()
+ }
+ }
+
+ impl<S: 'static> Task<S> {
+ pub(crate) fn into_raw(self) -> NonNull<Header> {
+ let ret = self.header().into();
+ mem::forget(self);
+ ret
+ }
+ }
+
+ impl<S: 'static> Notified<S> {
+ pub(crate) fn into_raw(self) -> NonNull<Header> {
+ self.0.into_raw()
+ }
+ }
+}
+
+impl<S: Schedule> Task<S> {
+ /// Pre-emptively cancel the task as part of the shutdown process.
+ pub(crate) fn shutdown(&self) {
+ self.raw.shutdown();
+ }
+}
+
+impl<S: Schedule> Notified<S> {
+ /// Run the task
+ pub(crate) fn run(self) {
+ self.0.raw.poll();
+ mem::forget(self);
+ }
+
+ /// Pre-emptively cancel the task as part of the shutdown process.
+ pub(crate) fn shutdown(self) {
+ self.0.shutdown();
+ }
+}
+
+impl<S: 'static> Drop for Task<S> {
+ fn drop(&mut self) {
+ // Decrement the ref count
+ if self.header().state.ref_dec() {
+ // Deallocate if this is the final ref count
+ self.raw.dealloc();
+ }
+ }
+}
+
+impl<S> fmt::Debug for Task<S> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "Task({:p})", self.header())
+ }
+}
+
+impl<S> fmt::Debug for Notified<S> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "task::Notified({:p})", self.0.header())
+ }
+}
+
+/// # Safety
+///
+/// Tasks are pinned
+unsafe impl<S> linked_list::Link for Task<S> {
+ type Handle = Task<S>;
+ type Target = Header;
+
+ fn as_raw(handle: &Task<S>) -> NonNull<Header> {
+ handle.header().into()
+ }
+
+ unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
+ Task::from_raw(ptr)
+ }
+
+ unsafe fn pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>> {
+ // Not super great as it avoids some of looms checking...
+ NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr))
+ }
+}
diff --git a/src/runtime/task/raw.rs b/src/runtime/task/raw.rs
new file mode 100644
index 0000000..cae56d0
--- /dev/null
+++ b/src/runtime/task/raw.rs
@@ -0,0 +1,131 @@
+use crate::runtime::task::{Cell, Harness, Header, Schedule, State};
+
+use std::future::Future;
+use std::ptr::NonNull;
+use std::task::{Poll, Waker};
+
+/// Raw task handle
+pub(super) struct RawTask {
+ ptr: NonNull<Header>,
+}
+
+pub(super) struct Vtable {
+ /// Poll the future
+ pub(super) poll: unsafe fn(NonNull<Header>),
+
+ /// Deallocate the memory
+ pub(super) dealloc: unsafe fn(NonNull<Header>),
+
+ /// Read the task output, if complete
+ pub(super) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker),
+
+ /// The join handle has been dropped
+ pub(super) drop_join_handle_slow: unsafe fn(NonNull<Header>),
+
+ /// Scheduler is being shutdown
+ pub(super) shutdown: unsafe fn(NonNull<Header>),
+}
+
+/// Get the vtable for the requested `T` and `S` generics.
+pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
+ &Vtable {
+ poll: poll::<T, S>,
+ dealloc: dealloc::<T, S>,
+ try_read_output: try_read_output::<T, S>,
+ drop_join_handle_slow: drop_join_handle_slow::<T, S>,
+ shutdown: shutdown::<T, S>,
+ }
+}
+
+impl RawTask {
+ pub(super) fn new<T, S>(task: T) -> RawTask
+ where
+ T: Future,
+ S: Schedule,
+ {
+ let ptr = Box::into_raw(Cell::<_, S>::new(task, State::new()));
+ let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) };
+
+ RawTask { ptr }
+ }
+
+ pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> RawTask {
+ RawTask { ptr }
+ }
+
+ /// Returns a reference to the task's meta structure.
+ ///
+ /// Safe as `Header` is `Sync`.
+ pub(super) fn header(&self) -> &Header {
+ unsafe { self.ptr.as_ref() }
+ }
+
+ /// Safety: mutual exclusion is required to call this function.
+ pub(super) fn poll(self) {
+ let vtable = self.header().vtable;
+ unsafe { (vtable.poll)(self.ptr) }
+ }
+
+ pub(super) fn dealloc(self) {
+ let vtable = self.header().vtable;
+ unsafe {
+ (vtable.dealloc)(self.ptr);
+ }
+ }
+
+ /// Safety: `dst` must be a `*mut Poll<super::Result<T::Output>>` where `T`
+ /// is the future stored by the task.
+ pub(super) unsafe fn try_read_output(self, dst: *mut (), waker: &Waker) {
+ let vtable = self.header().vtable;
+ (vtable.try_read_output)(self.ptr, dst, waker);
+ }
+
+ pub(super) fn drop_join_handle_slow(self) {
+ let vtable = self.header().vtable;
+ unsafe { (vtable.drop_join_handle_slow)(self.ptr) }
+ }
+
+ pub(super) fn shutdown(self) {
+ let vtable = self.header().vtable;
+ unsafe { (vtable.shutdown)(self.ptr) }
+ }
+}
+
+impl Clone for RawTask {
+ fn clone(&self) -> Self {
+ RawTask { ptr: self.ptr }
+ }
+}
+
+impl Copy for RawTask {}
+
+unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.poll();
+}
+
+unsafe fn dealloc<T: Future, S: Schedule>(ptr: NonNull<Header>) {
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.dealloc();
+}
+
+unsafe fn try_read_output<T: Future, S: Schedule>(
+ ptr: NonNull<Header>,
+ dst: *mut (),
+ waker: &Waker,
+) {
+ let out = &mut *(dst as *mut Poll<super::Result<T::Output>>);
+
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.try_read_output(out, waker);
+}
+
+unsafe fn drop_join_handle_slow<T: Future, S: Schedule>(ptr: NonNull<Header>) {
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.drop_join_handle_slow()
+}
+
+unsafe fn shutdown<T: Future, S: Schedule>(ptr: NonNull<Header>) {
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.shutdown()
+}
diff --git a/src/runtime/task/stack.rs b/src/runtime/task/stack.rs
new file mode 100644
index 0000000..9dd8d3f
--- /dev/null
+++ b/src/runtime/task/stack.rs
@@ -0,0 +1,83 @@
+use crate::loom::sync::atomic::AtomicPtr;
+use crate::runtime::task::{Header, Task};
+
+use std::marker::PhantomData;
+use std::ptr::{self, NonNull};
+use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
+
+/// Concurrent stack of tasks, used to pass ownership of a task from one worker
+/// to another.
+pub(crate) struct TransferStack<T: 'static> {
+ head: AtomicPtr<Header>,
+ _p: PhantomData<T>,
+}
+
+impl<T: 'static> TransferStack<T> {
+ pub(crate) fn new() -> TransferStack<T> {
+ TransferStack {
+ head: AtomicPtr::new(ptr::null_mut()),
+ _p: PhantomData,
+ }
+ }
+
+ pub(crate) fn push(&self, task: Task<T>) {
+ let task = task.into_raw();
+
+ // We don't care about any memory associated w/ setting the `head`
+ // field, just the current value.
+ //
+ // The compare-exchange creates a release sequence.
+ let mut curr = self.head.load(Relaxed);
+
+ loop {
+ unsafe {
+ task.as_ref()
+ .stack_next
+ .with_mut(|ptr| *ptr = NonNull::new(curr))
+ };
+
+ let res = self
+ .head
+ .compare_exchange(curr, task.as_ptr() as *mut _, Release, Relaxed);
+
+ match res {
+ Ok(_) => return,
+ Err(actual) => {
+ curr = actual;
+ }
+ }
+ }
+ }
+
+ pub(crate) fn drain(&self) -> impl Iterator<Item = Task<T>> {
+ struct Iter<T: 'static>(Option<NonNull<Header>>, PhantomData<T>);
+
+ impl<T: 'static> Iterator for Iter<T> {
+ type Item = Task<T>;
+
+ fn next(&mut self) -> Option<Task<T>> {
+ let task = self.0?;
+
+ // Move the cursor forward
+ self.0 = unsafe { task.as_ref().stack_next.with(|ptr| *ptr) };
+
+ // Return the task
+ unsafe { Some(Task::from_raw(task)) }
+ }
+ }
+
+ impl<T: 'static> Drop for Iter<T> {
+ fn drop(&mut self) {
+ use std::process;
+
+ if self.0.is_some() {
+ // we have bugs
+ process::abort();
+ }
+ }
+ }
+
+ let ptr = self.head.swap(ptr::null_mut(), Acquire);
+ Iter(NonNull::new(ptr), PhantomData)
+ }
+}
diff --git a/src/runtime/task/state.rs b/src/runtime/task/state.rs
new file mode 100644
index 0000000..21e9043
--- /dev/null
+++ b/src/runtime/task/state.rs
@@ -0,0 +1,446 @@
+use crate::loom::sync::atomic::AtomicUsize;
+
+use std::fmt;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
+use std::usize;
+
+pub(super) struct State {
+ val: AtomicUsize,
+}
+
+/// Current state value
+#[derive(Copy, Clone)]
+pub(super) struct Snapshot(usize);
+
+type UpdateResult = Result<Snapshot, Snapshot>;
+
+/// The task is currently being run.
+const RUNNING: usize = 0b0001;
+
+/// The task is complete.
+///
+/// Once this bit is set, it is never unset
+const COMPLETE: usize = 0b0010;
+
+/// Extracts the task's lifecycle value from the state
+const LIFECYCLE_MASK: usize = 0b11;
+
+/// Flag tracking if the task has been pushed into a run queue.
+const NOTIFIED: usize = 0b100;
+
+/// The join handle is still around
+const JOIN_INTEREST: usize = 0b1_000;
+
+/// A join handle waker has been set
+const JOIN_WAKER: usize = 0b10_000;
+
+/// The task has been forcibly cancelled.
+const CANCELLED: usize = 0b100_000;
+
+/// All bits
+const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
+
+/// Bits used by the ref count portion of the state.
+const REF_COUNT_MASK: usize = !STATE_MASK;
+
+/// Number of positions to shift the ref count
+const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
+
+/// One ref count
+const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
+
+/// State a task is initialized with
+///
+/// A task is initialized with two references: one for the scheduler and one for
+/// the `JoinHandle`. As the task starts with a `JoinHandle`, `JOIN_INTERST` is
+/// set. A new task is immediately pushed into the run queue for execution and
+/// starts with the `NOTIFIED` flag set.
+const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED;
+
+/// All transitions are performed via RMW operations. This establishes an
+/// unambiguous modification order.
+impl State {
+ /// Return a task's initial state
+ pub(super) fn new() -> State {
+ // A task is initialized with three references: one for the scheduler,
+ // one for the `JoinHandle`, one for the task handle made available in
+ // release. As the task starts with a `JoinHandle`, `JOIN_INTERST` is
+ // set. A new task is immediately pushed into the run queue for
+ // execution and starts with the `NOTIFIED` flag set.
+ State {
+ val: AtomicUsize::new(INITIAL_STATE),
+ }
+ }
+
+ /// Loads the current state, establishes `Acquire` ordering.
+ pub(super) fn load(&self) -> Snapshot {
+ Snapshot(self.val.load(Acquire))
+ }
+
+ /// Attempt to transition the lifecycle to `Running`.
+ ///
+ /// If `ref_inc` is set, the reference count is also incremented.
+ ///
+ /// The `NOTIFIED` bit is always unset.
+ pub(super) fn transition_to_running(&self, ref_inc: bool) -> UpdateResult {
+ self.fetch_update(|curr| {
+ assert!(curr.is_notified());
+
+ let mut next = curr;
+
+ if !next.is_idle() {
+ return None;
+ }
+
+ if ref_inc {
+ next.ref_inc();
+ }
+
+ next.set_running();
+ next.unset_notified();
+ Some(next)
+ })
+ }
+
+ /// Transitions the task from `Running` -> `Idle`.
+ ///
+ /// Returns `Ok` if the transition to `Idle` is successful, `Err` otherwise.
+ /// In both cases, a snapshot of the state from **after** the transition is
+ /// returned.
+ ///
+ /// The transition to `Idle` fails if the task has been flagged to be
+ /// cancelled.
+ pub(super) fn transition_to_idle(&self) -> UpdateResult {
+ self.fetch_update(|curr| {
+ assert!(curr.is_running());
+
+ if curr.is_cancelled() {
+ return None;
+ }
+
+ let mut next = curr;
+ next.unset_running();
+
+ if next.is_notified() {
+ // The caller needs to schedule the task. To do this, it needs a
+ // waker. The waker requires a ref count.
+ next.ref_inc();
+ }
+
+ Some(next)
+ })
+ }
+
+ /// Transitions the task from `Running` -> `Complete`.
+ pub(super) fn transition_to_complete(&self) -> Snapshot {
+ const DELTA: usize = RUNNING | COMPLETE;
+
+ let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
+ assert!(prev.is_running());
+ assert!(!prev.is_complete());
+
+ Snapshot(prev.0 ^ DELTA)
+ }
+
+ /// Transition from `Complete` -> `Terminal`, decrementing the reference
+ /// count by 1.
+ ///
+ /// When `ref_dec` is set, an additional ref count decrement is performed.
+ /// This is used to batch atomic ops when possible.
+ pub(super) fn transition_to_terminal(&self, complete: bool, ref_dec: bool) -> Snapshot {
+ self.fetch_update(|mut snapshot| {
+ if complete {
+ snapshot.set_complete();
+ } else {
+ assert!(snapshot.is_complete());
+ }
+
+ // Decrement the primary handle
+ snapshot.ref_dec();
+
+ if ref_dec {
+ // Decrement a second time
+ snapshot.ref_dec();
+ }
+
+ Some(snapshot)
+ })
+ .unwrap()
+ }
+
+ /// Transitions the state to `NOTIFIED`.
+ ///
+ /// Returns `true` if the task needs to be submitted to the pool for
+ /// execution
+ pub(super) fn transition_to_notified(&self) -> bool {
+ let prev = Snapshot(self.val.fetch_or(NOTIFIED, AcqRel));
+ prev.will_need_queueing()
+ }
+
+ /// Set the `CANCELLED` bit and attempt to transition to `Running`.
+ ///
+ /// Returns `true` if the transition to `Running` succeeded.
+ pub(super) fn transition_to_shutdown(&self) -> bool {
+ let mut prev = Snapshot(0);
+
+ let _ = self.fetch_update(|mut snapshot| {
+ prev = snapshot;
+
+ if snapshot.is_idle() {
+ snapshot.set_running();
+
+ if snapshot.is_notified() {
+ // If the task is idle and notified, this indicates the task is
+ // in the run queue and is considered owned by the scheduler.
+ // The shutdown operation claims ownership of the task, which
+ // means we need to assign an additional ref-count to the task
+ // in the queue.
+ snapshot.ref_inc();
+ }
+ }
+
+ snapshot.set_cancelled();
+ Some(snapshot)
+ });
+
+ prev.is_idle()
+ }
+
+ /// Optimistically tries to swap the state assuming the join handle is
+ /// __immediately__ dropped on spawn
+ pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
+ use std::sync::atomic::Ordering::Relaxed;
+
+ // Relaxed is acceptable as if this function is called and succeeds,
+ // then nothing has been done w/ the join handle.
+ //
+ // The moment the join handle is used (polled), the `JOIN_WAKER` flag is
+ // set, at which point the CAS will fail.
+ //
+ // Given this, there is no risk if this operation is reordered.
+ self.val
+ .compare_exchange_weak(
+ INITIAL_STATE,
+ (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
+ Release,
+ Relaxed,
+ )
+ .map(|_| ())
+ .map_err(|_| ())
+ }
+
+ /// Try to unset the JOIN_INTEREST flag.
+ ///
+ /// Returns `Ok` if the operation happens before the task transitions to a
+ /// completed state, `Err` otherwise.
+ pub(super) fn unset_join_interested(&self) -> UpdateResult {
+ self.fetch_update(|curr| {
+ assert!(curr.is_join_interested());
+
+ if curr.is_complete() {
+ return None;
+ }
+
+ let mut next = curr;
+ next.unset_join_interested();
+
+ Some(next)
+ })
+ }
+
+ /// Set the `JOIN_WAKER` bit.
+ ///
+ /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
+ /// the task has completed.
+ pub(super) fn set_join_waker(&self) -> UpdateResult {
+ self.fetch_update(|curr| {
+ assert!(curr.is_join_interested());
+ assert!(!curr.has_join_waker());
+
+ if curr.is_complete() {
+ return None;
+ }
+
+ let mut next = curr;
+ next.set_join_waker();
+
+ Some(next)
+ })
+ }
+
+ /// Unsets the `JOIN_WAKER` bit.
+ ///
+ /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if
+ /// the task has completed.
+ pub(super) fn unset_waker(&self) -> UpdateResult {
+ self.fetch_update(|curr| {
+ assert!(curr.is_join_interested());
+ assert!(curr.has_join_waker());
+
+ if curr.is_complete() {
+ return None;
+ }
+
+ let mut next = curr;
+ next.unset_join_waker();
+
+ Some(next)
+ })
+ }
+
+ pub(super) fn ref_inc(&self) {
+ use std::process;
+ use std::sync::atomic::Ordering::Relaxed;
+
+ // Using a relaxed ordering is alright here, as knowledge of the
+ // original reference prevents other threads from erroneously deleting
+ // the object.
+ //
+ // As explained in the [Boost documentation][1], Increasing the
+ // reference counter can always be done with memory_order_relaxed: New
+ // references to an object can only be formed from an existing
+ // reference, and passing an existing reference from one thread to
+ // another must already provide any required synchronization.
+ //
+ // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
+ let prev = self.val.fetch_add(REF_ONE, Relaxed);
+
+ // If the reference count overflowed, abort.
+ if prev > isize::max_value() as usize {
+ process::abort();
+ }
+ }
+
+ /// Returns `true` if the task should be released.
+ pub(super) fn ref_dec(&self) -> bool {
+ let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
+ prev.ref_count() == 1
+ }
+
+ fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
+ where
+ F: FnMut(Snapshot) -> Option<Snapshot>,
+ {
+ let mut curr = self.load();
+
+ loop {
+ let next = match f(curr) {
+ Some(next) => next,
+ None => return Err(curr),
+ };
+
+ let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => return Ok(next),
+ Err(actual) => curr = Snapshot(actual),
+ }
+ }
+ }
+}
+
+// ===== impl Snapshot =====
+
+impl Snapshot {
+ /// Returns `true` if the task is in an idle state.
+ pub(super) fn is_idle(self) -> bool {
+ self.0 & (RUNNING | COMPLETE) == 0
+ }
+
+ /// Returns `true` if the task has been flagged as notified.
+ pub(super) fn is_notified(self) -> bool {
+ self.0 & NOTIFIED == NOTIFIED
+ }
+
+ fn unset_notified(&mut self) {
+ self.0 &= !NOTIFIED
+ }
+
+ pub(super) fn is_running(self) -> bool {
+ self.0 & RUNNING == RUNNING
+ }
+
+ fn set_running(&mut self) {
+ self.0 |= RUNNING;
+ }
+
+ fn unset_running(&mut self) {
+ self.0 &= !RUNNING;
+ }
+
+ pub(super) fn is_cancelled(self) -> bool {
+ self.0 & CANCELLED == CANCELLED
+ }
+
+ fn set_cancelled(&mut self) {
+ self.0 |= CANCELLED;
+ }
+
+ fn set_complete(&mut self) {
+ self.0 |= COMPLETE;
+ }
+
+ /// Returns `true` if the task's future has completed execution.
+ pub(super) fn is_complete(self) -> bool {
+ self.0 & COMPLETE == COMPLETE
+ }
+
+ pub(super) fn is_join_interested(self) -> bool {
+ self.0 & JOIN_INTEREST == JOIN_INTEREST
+ }
+
+ fn unset_join_interested(&mut self) {
+ self.0 &= !JOIN_INTEREST
+ }
+
+ pub(super) fn has_join_waker(self) -> bool {
+ self.0 & JOIN_WAKER == JOIN_WAKER
+ }
+
+ fn set_join_waker(&mut self) {
+ self.0 |= JOIN_WAKER;
+ }
+
+ fn unset_join_waker(&mut self) {
+ self.0 &= !JOIN_WAKER
+ }
+
+ pub(super) fn ref_count(self) -> usize {
+ (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
+ }
+
+ fn ref_inc(&mut self) {
+ assert!(self.0 <= isize::max_value() as usize);
+ self.0 += REF_ONE;
+ }
+
+ pub(super) fn ref_dec(&mut self) {
+ assert!(self.ref_count() > 0);
+ self.0 -= REF_ONE
+ }
+
+ fn will_need_queueing(self) -> bool {
+ !self.is_notified() && self.is_idle()
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let snapshot = self.load();
+ snapshot.fmt(fmt)
+ }
+}
+
+impl fmt::Debug for Snapshot {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Snapshot")
+ .field("is_running", &self.is_running())
+ .field("is_complete", &self.is_complete())
+ .field("is_notified", &self.is_notified())
+ .field("is_cancelled", &self.is_cancelled())
+ .field("is_join_interested", &self.is_join_interested())
+ .field("has_join_waker", &self.has_join_waker())
+ .field("ref_count", &self.ref_count())
+ .finish()
+ }
+}
diff --git a/src/runtime/task/waker.rs b/src/runtime/task/waker.rs
new file mode 100644
index 0000000..5c2d478
--- /dev/null
+++ b/src/runtime/task/waker.rs
@@ -0,0 +1,101 @@
+use crate::runtime::task::harness::Harness;
+use crate::runtime::task::{Header, Schedule};
+
+use std::future::Future;
+use std::marker::PhantomData;
+use std::mem::ManuallyDrop;
+use std::ops;
+use std::ptr::NonNull;
+use std::task::{RawWaker, RawWakerVTable, Waker};
+
+pub(super) struct WakerRef<'a, S: 'static> {
+ waker: ManuallyDrop<Waker>,
+ _p: PhantomData<(&'a Header, S)>,
+}
+
+/// Returns a `WakerRef` which avoids having to pre-emptively increase the
+/// refcount if there is no need to do so.
+pub(super) fn waker_ref<T, S>(header: &Header) -> WakerRef<'_, S>
+where
+ T: Future,
+ S: Schedule,
+{
+ // `Waker::will_wake` uses the VTABLE pointer as part of the check. This
+ // means that `will_wake` will always return false when using the current
+ // task's waker. (discussion at rust-lang/rust#66281).
+ //
+ // To fix this, we use a single vtable. Since we pass in a reference at this
+ // point and not an *owned* waker, we must ensure that `drop` is never
+ // called on this waker instance. This is done by wrapping it with
+ // `ManuallyDrop` and then never calling drop.
+ let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(header))) };
+
+ WakerRef {
+ waker,
+ _p: PhantomData,
+ }
+}
+
+impl<S> ops::Deref for WakerRef<'_, S> {
+ type Target = Waker;
+
+ fn deref(&self) -> &Waker {
+ &self.waker
+ }
+}
+
+unsafe fn clone_waker<T, S>(ptr: *const ()) -> RawWaker
+where
+ T: Future,
+ S: Schedule,
+{
+ let header = ptr as *const Header;
+ (*header).state.ref_inc();
+ raw_waker::<T, S>(header)
+}
+
+unsafe fn drop_waker<T, S>(ptr: *const ())
+where
+ T: Future,
+ S: Schedule,
+{
+ let ptr = NonNull::new_unchecked(ptr as *mut Header);
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.drop_reference();
+}
+
+unsafe fn wake_by_val<T, S>(ptr: *const ())
+where
+ T: Future,
+ S: Schedule,
+{
+ let ptr = NonNull::new_unchecked(ptr as *mut Header);
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.wake_by_val();
+}
+
+// Wake without consuming the waker
+unsafe fn wake_by_ref<T, S>(ptr: *const ())
+where
+ T: Future,
+ S: Schedule,
+{
+ let ptr = NonNull::new_unchecked(ptr as *mut Header);
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.wake_by_ref();
+}
+
+fn raw_waker<T, S>(header: *const Header) -> RawWaker
+where
+ T: Future,
+ S: Schedule,
+{
+ let ptr = header as *const ();
+ let vtable = &RawWakerVTable::new(
+ clone_waker::<T, S>,
+ wake_by_val::<T, S>,
+ wake_by_ref::<T, S>,
+ drop_waker::<T, S>,
+ );
+ RawWaker::new(ptr, vtable)
+}