aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStjepan Glavina <stjepang@gmail.com>2019-08-12 20:18:51 +0200
committerStjepan Glavina <stjepang@gmail.com>2019-08-12 20:18:51 +0200
commit1479e86ca668fa9205d6ef50867d9688ffb4d804 (patch)
tree6d22c859485aacbf81ffee52150a4675febda75f /src
downloadasync-task-1479e86ca668fa9205d6ef50867d9688ffb4d804.tar.gz
Initial commit
Diffstat (limited to 'src')
-rw-r--r--src/header.rs158
-rw-r--r--src/join_handle.rs333
-rw-r--r--src/lib.rs149
-rw-r--r--src/raw.rs629
-rw-r--r--src/state.rs65
-rw-r--r--src/task.rs390
-rw-r--r--src/utils.rs48
7 files changed, 1772 insertions, 0 deletions
diff --git a/src/header.rs b/src/header.rs
new file mode 100644
index 0000000..0ce5164
--- /dev/null
+++ b/src/header.rs
@@ -0,0 +1,158 @@
+use std::alloc::Layout;
+use std::cell::Cell;
+use std::fmt;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::task::Waker;
+
+use crossbeam_utils::Backoff;
+
+use crate::raw::TaskVTable;
+use crate::state::*;
+use crate::utils::{abort_on_panic, extend};
+
+/// The header of a task.
+///
+/// This header is stored right at the beginning of every heap-allocated task.
+pub(crate) struct Header {
+ /// Current state of the task.
+ ///
+ /// Contains flags representing the current state and the reference count.
+ pub(crate) state: AtomicUsize,
+
+ /// The task that is blocked on the `JoinHandle`.
+ ///
+ /// This waker needs to be woken once the task completes or is closed.
+ pub(crate) awaiter: Cell<Option<Waker>>,
+
+ /// The virtual table.
+ ///
+ /// In addition to the actual waker virtual table, it also contains pointers to several other
+ /// methods necessary for bookkeeping the heap-allocated task.
+ pub(crate) vtable: &'static TaskVTable,
+}
+
+impl Header {
+ /// Cancels the task.
+ ///
+ /// This method will only mark the task as closed and will notify the awaiter, but it won't
+ /// reschedule the task if it's not completed.
+ pub(crate) fn cancel(&self) {
+ let mut state = self.state.load(Ordering::Acquire);
+
+ loop {
+ // If the task has been completed or closed, it can't be cancelled.
+ if state & (COMPLETED | CLOSED) != 0 {
+ break;
+ }
+
+ // Mark the task as closed.
+ match self.state.compare_exchange_weak(
+ state,
+ state | CLOSED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Notify the awaiter that the task has been closed.
+ if state & AWAITER != 0 {
+ self.notify();
+ }
+
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+
+ /// Notifies the task blocked on the task.
+ ///
+ /// If there is a registered waker, it will be removed from the header and woken.
+ #[inline]
+ pub(crate) fn notify(&self) {
+ if let Some(waker) = self.swap_awaiter(None) {
+ // We need a safeguard against panics because waking can panic.
+ abort_on_panic(|| {
+ waker.wake();
+ });
+ }
+ }
+
+ /// Notifies the task blocked on the task unless its waker matches `current`.
+ ///
+ /// If there is a registered waker, it will be removed from the header.
+ #[inline]
+ pub(crate) fn notify_unless(&self, current: &Waker) {
+ if let Some(waker) = self.swap_awaiter(None) {
+ if !waker.will_wake(current) {
+ // We need a safeguard against panics because waking can panic.
+ abort_on_panic(|| {
+ waker.wake();
+ });
+ }
+ }
+ }
+
+ /// Swaps the awaiter and returns the previous value.
+ #[inline]
+ pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> {
+ let new_is_none = new.is_none();
+
+ // We're about to try acquiring the lock in a loop. If it's already being held by another
+ // thread, we'll have to spin for a while so it's best to employ a backoff strategy.
+ let backoff = Backoff::new();
+ loop {
+ // Acquire the lock. If we're storing an awaiter, then also set the awaiter flag.
+ let state = if new_is_none {
+ self.state.fetch_or(LOCKED, Ordering::Acquire)
+ } else {
+ self.state.fetch_or(LOCKED | AWAITER, Ordering::Acquire)
+ };
+
+ // If the lock was acquired, break from the loop.
+ if state & LOCKED == 0 {
+ break;
+ }
+
+ // Snooze for a little while because the lock is held by another thread.
+ backoff.snooze();
+ }
+
+ // Replace the awaiter.
+ let old = self.awaiter.replace(new);
+
+ // Release the lock. If we've cleared the awaiter, then also unset the awaiter flag.
+ if new_is_none {
+ self.state.fetch_and(!LOCKED & !AWAITER, Ordering::Release);
+ } else {
+ self.state.fetch_and(!LOCKED, Ordering::Release);
+ }
+
+ old
+ }
+
+ /// Returns the offset at which the tag of type `T` is stored.
+ #[inline]
+ pub(crate) fn offset_tag<T>() -> usize {
+ let layout_header = Layout::new::<Header>();
+ let layout_t = Layout::new::<T>();
+ let (_, offset_t) = extend(layout_header, layout_t);
+ offset_t
+ }
+}
+
+impl fmt::Debug for Header {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let state = self.state.load(Ordering::SeqCst);
+
+ f.debug_struct("Header")
+ .field("scheduled", &(state & SCHEDULED != 0))
+ .field("running", &(state & RUNNING != 0))
+ .field("completed", &(state & COMPLETED != 0))
+ .field("closed", &(state & CLOSED != 0))
+ .field("awaiter", &(state & AWAITER != 0))
+ .field("handle", &(state & HANDLE != 0))
+ .field("ref_count", &(state / REFERENCE))
+ .finish()
+ }
+}
diff --git a/src/join_handle.rs b/src/join_handle.rs
new file mode 100644
index 0000000..fb5c275
--- /dev/null
+++ b/src/join_handle.rs
@@ -0,0 +1,333 @@
+use std::fmt;
+use std::future::Future;
+use std::marker::{PhantomData, Unpin};
+use std::pin::Pin;
+use std::ptr::NonNull;
+use std::sync::atomic::Ordering;
+use std::task::{Context, Poll};
+
+use crate::header::Header;
+use crate::state::*;
+use crate::utils::abort_on_panic;
+
+/// A handle that awaits the result of a task.
+///
+/// If the task has completed with `value`, the handle returns it as `Some(value)`. If the task was
+/// cancelled or has panicked, the handle returns `None`. Otherwise, the handle has to wait until
+/// the task completes, panics, or gets cancelled.
+///
+/// # Examples
+///
+/// ```
+/// #![feature(async_await)]
+///
+/// use crossbeam::channel;
+/// use futures::executor;
+///
+/// // The future inside the task.
+/// let future = async { 1 + 2 };
+///
+/// // If the task gets woken, it will be sent into this channel.
+/// let (s, r) = channel::unbounded();
+/// let schedule = move |task| s.send(task).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (task, handle) = async_task::spawn(future, schedule, ());
+///
+/// // Run the task. In this example, it will complete after a single run.
+/// task.run();
+/// assert!(r.is_empty());
+///
+/// // Await the result of the task.
+/// let result = executor::block_on(handle);
+/// assert_eq!(result, Some(3));
+/// ```
+pub struct JoinHandle<R, T> {
+ /// A raw task pointer.
+ pub(crate) raw_task: NonNull<()>,
+
+ /// A marker capturing the generic type `R`.
+ pub(crate) _marker: PhantomData<(R, T)>,
+}
+
+unsafe impl<R, T> Send for JoinHandle<R, T> {}
+unsafe impl<R, T> Sync for JoinHandle<R, T> {}
+
+impl<R, T> Unpin for JoinHandle<R, T> {}
+
+impl<R, T> JoinHandle<R, T> {
+ /// Cancels the task.
+ ///
+ /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
+ /// to run it won't do anything. And if it's completed, awaiting its result evaluates to
+ /// `None`.
+ ///
+ /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #![feature(async_await)]
+ /// use crossbeam::channel;
+ /// use futures::executor;
+ ///
+ /// // The future inside the task.
+ /// let future = async { 1 + 2 };
+ ///
+ /// // If the task gets woken, it will be sent into this channel.
+ /// let (s, r) = channel::unbounded();
+ /// let schedule = move |task| s.send(task).unwrap();
+ ///
+ /// // Create a task with the future and the schedule function.
+ /// let (task, handle) = async_task::spawn(future, schedule, ());
+ ///
+ /// // Cancel the task.
+ /// handle.cancel();
+ ///
+ /// // Running a cancelled task does nothing.
+ /// task.run();
+ ///
+ /// // Await the result of the task.
+ /// let result = executor::block_on(handle);
+ /// assert_eq!(result, None);
+ /// ```
+ pub fn cancel(&self) {
+ let ptr = self.raw_task.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ let mut state = (*header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task has been completed or closed, it can't be cancelled.
+ if state & (COMPLETED | CLOSED) != 0 {
+ break;
+ }
+
+ // If the task is not scheduled nor running, we'll need to schedule it.
+ let new = if state & (SCHEDULED | RUNNING) == 0 {
+ (state | SCHEDULED | CLOSED) + REFERENCE
+ } else {
+ state | CLOSED
+ };
+
+ // Mark the task as closed.
+ match (*header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the task is not scheduled nor running, schedule it so that its future
+ // gets dropped by the executor.
+ if state & (SCHEDULED | RUNNING) == 0 {
+ ((*header).vtable.schedule)(ptr);
+ }
+
+ // Notify the awaiter that the task has been closed.
+ if state & AWAITER != 0 {
+ (*header).notify();
+ }
+
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ /// Returns a reference to the tag stored inside the task.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #![feature(async_await)]
+ /// use crossbeam::channel;
+ ///
+ /// // The future inside the task.
+ /// let future = async { 1 + 2 };
+ ///
+ /// // If the task gets woken, it will be sent into this channel.
+ /// let (s, r) = channel::unbounded();
+ /// let schedule = move |task| s.send(task).unwrap();
+ ///
+ /// // Create a task with the future and the schedule function.
+ /// let (task, handle) = async_task::spawn(future, schedule, "a simple task");
+ ///
+ /// // Access the tag.
+ /// assert_eq!(*handle.tag(), "a simple task");
+ /// ```
+ pub fn tag(&self) -> &T {
+ let offset = Header::offset_tag::<T>();
+ let ptr = self.raw_task.as_ptr();
+
+ unsafe {
+ let raw = (ptr as *mut u8).add(offset) as *const T;
+ &*raw
+ }
+ }
+}
+
+impl<R, T> Drop for JoinHandle<R, T> {
+ fn drop(&mut self) {
+ let ptr = self.raw_task.as_ptr();
+ let header = ptr as *const Header;
+
+ // A place where the output will be stored in case it needs to be dropped.
+ let mut output = None;
+
+ unsafe {
+ // Optimistically assume the `JoinHandle` is being dropped just after creating the
+ // task. This is a common case so if the handle is not used, the overhead of it is only
+ // one compare-exchange operation.
+ if let Err(mut state) = (*header).state.compare_exchange_weak(
+ SCHEDULED | HANDLE | REFERENCE,
+ SCHEDULED | REFERENCE,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ loop {
+ // If the task has been completed but not yet closed, that means its output
+ // must be dropped.
+ if state & COMPLETED != 0 && state & CLOSED == 0 {
+ // Mark the task as closed in order to grab its output.
+ match (*header).state.compare_exchange_weak(
+ state,
+ state | CLOSED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Read the output.
+ output =
+ Some((((*header).vtable.get_output)(ptr) as *mut R).read());
+
+ // Update the state variable because we're continuing the loop.
+ state |= CLOSED;
+ }
+ Err(s) => state = s,
+ }
+ } else {
+ // If this is the last reference to task and it's not closed, then close
+ // it and schedule one more time so that its future gets dropped by the
+ // executor.
+ let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
+ SCHEDULED | CLOSED | REFERENCE
+ } else {
+ state & !HANDLE
+ };
+
+ // Unset the handle flag.
+ match (*header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If this is the last reference to the task, we need to either
+ // schedule dropping its future or destroy it.
+ if state & !(REFERENCE - 1) == 0 {
+ if state & CLOSED == 0 {
+ ((*header).vtable.schedule)(ptr);
+ } else {
+ ((*header).vtable.destroy)(ptr);
+ }
+ }
+
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+ }
+
+ // Drop the output if it was taken out of the task.
+ drop(output);
+ }
+}
+
+impl<R, T> Future for JoinHandle<R, T> {
+ type Output = Option<R>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let ptr = self.raw_task.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ let mut state = (*header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task has been closed, notify the awaiter and return `None`.
+ if state & CLOSED != 0 {
+ // Even though the awaiter is most likely the current task, it could also be
+ // another task.
+ (*header).notify_unless(cx.waker());
+ return Poll::Ready(None);
+ }
+
+ // If the task is not completed, register the current task.
+ if state & COMPLETED == 0 {
+ // Replace the waker with one associated with the current task. We need a
+ // safeguard against panics because dropping the previous waker can panic.
+ abort_on_panic(|| {
+ (*header).swap_awaiter(Some(cx.waker().clone()));
+ });
+
+ // Reload the state after registering. It is possible that the task became
+ // completed or closed just before registration so we need to check for that.
+ state = (*header).state.load(Ordering::Acquire);
+
+ // If the task has been closed, notify the awaiter and return `None`.
+ if state & CLOSED != 0 {
+ // Even though the awaiter is most likely the current task, it could also
+ // be another task.
+ (*header).notify_unless(cx.waker());
+ return Poll::Ready(None);
+ }
+
+ // If the task is still not completed, we're blocked on it.
+ if state & COMPLETED == 0 {
+ return Poll::Pending;
+ }
+ }
+
+ // Since the task is now completed, mark it as closed in order to grab its output.
+ match (*header).state.compare_exchange(
+ state,
+ state | CLOSED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Notify the awaiter. Even though the awaiter is most likely the current
+ // task, it could also be another task.
+ if state & AWAITER != 0 {
+ (*header).notify_unless(cx.waker());
+ }
+
+ // Take the output from the task.
+ let output = ((*header).vtable.get_output)(ptr) as *mut R;
+ return Poll::Ready(Some(output.read()));
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+}
+
+impl<R, T> fmt::Debug for JoinHandle<R, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let ptr = self.raw_task.as_ptr();
+ let header = ptr as *const Header;
+
+ f.debug_struct("JoinHandle")
+ .field("header", unsafe { &(*header) })
+ .finish()
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..5518515
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,149 @@
+//! Task abstraction for building executors.
+//!
+//! # What is an executor?
+//!
+//! An async block creates a future and an async function returns one. But futures don't do
+//! anything unless they are awaited inside other async blocks or async functions. So the question
+//! arises: who or what awaits the main future that awaits others?
+//!
+//! One solution is to call [`block_on()`] on the main future, which will block
+//! the current thread and keep polling the future until it completes. But sometimes we don't want
+//! to block the current thread and would prefer to *spawn* the future to let a background thread
+//! block on it instead.
+//!
+//! This is where executors step in - they create a number of threads (typically equal to the
+//! number of CPU cores on the system) that are dedicated to polling spawned futures. Each executor
+//! thread keeps polling spawned futures in a loop and only blocks when all spawned futures are
+//! either sleeping or running.
+//!
+//! # What is a task?
+//!
+//! In order to spawn a future on an executor, one needs to allocate the future on the heap and
+//! keep some state alongside it, like whether the future is ready for polling, waiting to be woken
+//! up, or completed. This allocation is usually called a *task*.
+//!
+//! The executor then runs the spawned task by polling its future. If the future is pending on a
+//! resource, a [`Waker`] associated with the task will be registered somewhere so that the task
+//! can be woken up and run again at a later time.
+//!
+//! For example, if the future wants to read something from a TCP socket that is not ready yet, the
+//! networking system will clone the task's waker and wake it up once the socket becomes ready.
+//!
+//! # Task construction
+//!
+//! A task is constructed with [`Task::create()`]:
+//!
+//! ```
+//! # #![feature(async_await)]
+//! let future = async { 1 + 2 };
+//! let schedule = |task| unimplemented!();
+//!
+//! let (task, handle) = async_task::spawn(future, schedule, ());
+//! ```
+//!
+//! The first argument to the constructor, `()` in this example, is an arbitrary piece of data
+//! called a *tag*. This can be a task identifier, a task name, task-local storage, or something
+//! of similar nature.
+//!
+//! The second argument is the future that gets polled when the task is run.
+//!
+//! The third argument is the schedule function, which is called every time when the task gets
+//! woken up. This function should push the received task into some kind of queue of runnable
+//! tasks.
+//!
+//! The constructor returns a runnable [`Task`] and a [`JoinHandle`] that can await the result of
+//! the future.
+//!
+//! # Task scheduling
+//!
+//! TODO
+//!
+//! # Join handles
+//!
+//! TODO
+//!
+//! # Cancellation
+//!
+//! TODO
+//!
+//! # Performance
+//!
+//! TODO: explain single allocation, etc.
+//!
+//! Task [construction] incurs a single allocation only. The [`Task`] can then be run and its
+//! result awaited through the [`JoinHandle`]. When woken, the task gets automatically rescheduled.
+//! It's also possible to cancel the task so that it stops running and can't be awaited anymore.
+//!
+//! [construction]: struct.Task.html#method.create
+//! [`JoinHandle`]: struct.JoinHandle.html
+//! [`Task`]: struct.Task.html
+//! [`Future`]: https://doc.rust-lang.org/nightly/std/future/trait.Future.html
+//! [`Waker`]: https://doc.rust-lang.org/nightly/std/task/struct.Waker.html
+//! [`block_on()`]: https://docs.rs/futures-preview/*/futures/executor/fn.block_on.html
+//!
+//! # Examples
+//!
+//! A simple single-threaded executor:
+//!
+//! ```
+//! # #![feature(async_await)]
+//! use std::future::Future;
+//! use std::panic::catch_unwind;
+//! use std::thread;
+//!
+//! use async_task::{JoinHandle, Task};
+//! use crossbeam::channel::{unbounded, Sender};
+//! use futures::executor;
+//! use lazy_static::lazy_static;
+//!
+//! /// Spawns a future on the executor.
+//! fn spawn<F, R>(future: F) -> JoinHandle<R, ()>
+//! where
+//! F: Future<Output = R> + Send + 'static,
+//! R: Send + 'static,
+//! {
+//! lazy_static! {
+//! // A channel that holds scheduled tasks.
+//! static ref QUEUE: Sender<Task<()>> = {
+//! let (sender, receiver) = unbounded::<Task<()>>();
+//!
+//! // Start the executor thread.
+//! thread::spawn(|| {
+//! for task in receiver {
+//! // Ignore panics for simplicity.
+//! let _ignore_panic = catch_unwind(|| task.run());
+//! }
+//! });
+//!
+//! sender
+//! };
+//! }
+//!
+//! // Create a task that is scheduled by sending itself into the channel.
+//! let schedule = |t| QUEUE.send(t).unwrap();
+//! let (task, handle) = async_task::spawn(future, schedule, ());
+//!
+//! // Schedule the task by sending it into the channel.
+//! task.schedule();
+//!
+//! handle
+//! }
+//!
+//! // Spawn a future and await its result.
+//! let handle = spawn(async {
+//! println!("Hello, world!");
+//! });
+//! executor::block_on(handle);
+//! ```
+
+#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
+
+mod header;
+mod join_handle;
+mod raw;
+mod state;
+mod task;
+mod utils;
+
+pub use crate::join_handle::JoinHandle;
+pub use crate::task::{spawn, Task};
diff --git a/src/raw.rs b/src/raw.rs
new file mode 100644
index 0000000..6928427
--- /dev/null
+++ b/src/raw.rs
@@ -0,0 +1,629 @@
+use std::alloc::{self, Layout};
+use std::cell::Cell;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::mem::{self, ManuallyDrop};
+use std::pin::Pin;
+use std::ptr::NonNull;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
+
+use crate::header::Header;
+use crate::state::*;
+use crate::utils::{abort_on_panic, extend};
+use crate::Task;
+
+/// The vtable for a task.
+pub(crate) struct TaskVTable {
+ /// The raw waker vtable.
+ pub(crate) raw_waker: RawWakerVTable,
+
+ /// Schedules the task.
+ pub(crate) schedule: unsafe fn(*const ()),
+
+ /// Drops the future inside the task.
+ pub(crate) drop_future: unsafe fn(*const ()),
+
+ /// Returns a pointer to the output stored after completion.
+ pub(crate) get_output: unsafe fn(*const ()) -> *const (),
+
+ /// Drops a waker or a task.
+ pub(crate) decrement: unsafe fn(ptr: *const ()),
+
+ /// Destroys the task.
+ pub(crate) destroy: unsafe fn(*const ()),
+
+ /// Runs the task.
+ pub(crate) run: unsafe fn(*const ()),
+}
+
+/// Memory layout of a task.
+///
+/// This struct contains the information on:
+///
+/// 1. How to allocate and deallocate the task.
+/// 2. How to access the fields inside the task.
+#[derive(Clone, Copy)]
+pub(crate) struct TaskLayout {
+ /// Memory layout of the whole task.
+ pub(crate) layout: Layout,
+
+ /// Offset into the task at which the tag is stored.
+ pub(crate) offset_t: usize,
+
+ /// Offset into the task at which the schedule function is stored.
+ pub(crate) offset_s: usize,
+
+ /// Offset into the task at which the future is stored.
+ pub(crate) offset_f: usize,
+
+ /// Offset into the task at which the output is stored.
+ pub(crate) offset_r: usize,
+}
+
+/// Raw pointers to the fields of a task.
+pub(crate) struct RawTask<F, R, S, T> {
+ /// The task header.
+ pub(crate) header: *const Header,
+
+ /// The schedule function.
+ pub(crate) schedule: *const S,
+
+ /// The tag inside the task.
+ pub(crate) tag: *mut T,
+
+ /// The future.
+ pub(crate) future: *mut F,
+
+ /// The output of the future.
+ pub(crate) output: *mut R,
+}
+
+impl<F, R, S, T> Copy for RawTask<F, R, S, T> {}
+
+impl<F, R, S, T> Clone for RawTask<F, R, S, T> {
+ fn clone(&self) -> Self {
+ Self {
+ header: self.header,
+ schedule: self.schedule,
+ tag: self.tag,
+ future: self.future,
+ output: self.output,
+ }
+ }
+}
+
+impl<F, R, S, T> RawTask<F, R, S, T>
+where
+ F: Future<Output = R> + Send + 'static,
+ R: Send + 'static,
+ S: Fn(Task<T>) + Send + Sync + 'static,
+ T: Send + 'static,
+{
+ /// Allocates a task with the given `future` and `schedule` function.
+ ///
+ /// It is assumed there are initially only the `Task` reference and the `JoinHandle`.
+ pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> {
+ // Compute the layout of the task for allocation. Abort if the computation fails.
+ let task_layout = abort_on_panic(|| Self::task_layout());
+
+ unsafe {
+ // Allocate enough space for the entire task.
+ let raw_task = match NonNull::new(alloc::alloc(task_layout.layout) as *mut ()) {
+ None => std::process::abort(),
+ Some(p) => p,
+ };
+
+ let raw = Self::from_ptr(raw_task.as_ptr());
+
+ // Write the header as the first field of the task.
+ (raw.header as *mut Header).write(Header {
+ state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
+ awaiter: Cell::new(None),
+ vtable: &TaskVTable {
+ raw_waker: RawWakerVTable::new(
+ Self::clone_waker,
+ Self::wake,
+ Self::wake_by_ref,
+ Self::decrement,
+ ),
+ schedule: Self::schedule,
+ drop_future: Self::drop_future,
+ get_output: Self::get_output,
+ decrement: Self::decrement,
+ destroy: Self::destroy,
+ run: Self::run,
+ },
+ });
+
+ // Write the tag as the second field of the task.
+ (raw.tag as *mut T).write(tag);
+
+ // Write the schedule function as the third field of the task.
+ (raw.schedule as *mut S).write(schedule);
+
+ // Write the future as the fourth field of the task.
+ raw.future.write(future);
+
+ raw_task
+ }
+ }
+
+ /// Creates a `RawTask` from a raw task pointer.
+ #[inline]
+ pub(crate) fn from_ptr(ptr: *const ()) -> Self {
+ let task_layout = Self::task_layout();
+ let p = ptr as *const u8;
+
+ unsafe {
+ Self {
+ header: p as *const Header,
+ tag: p.add(task_layout.offset_t) as *mut T,
+ schedule: p.add(task_layout.offset_s) as *const S,
+ future: p.add(task_layout.offset_f) as *mut F,
+ output: p.add(task_layout.offset_r) as *mut R,
+ }
+ }
+ }
+
+ /// Returns the memory layout for a task.
+ #[inline]
+ fn task_layout() -> TaskLayout {
+ // Compute the layouts for `Header`, `T`, `S`, `F`, and `R`.
+ let layout_header = Layout::new::<Header>();
+ let layout_t = Layout::new::<T>();
+ let layout_s = Layout::new::<S>();
+ let layout_f = Layout::new::<F>();
+ let layout_r = Layout::new::<R>();
+
+ // Compute the layout for `union { F, R }`.
+ let size_union = layout_f.size().max(layout_r.size());
+ let align_union = layout_f.align().max(layout_r.align());
+ let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
+
+ // Compute the layout for `Header` followed by `T`, then `S`, then `union { F, R }`.
+ let layout = layout_header;
+ let (layout, offset_t) = extend(layout, layout_t);
+ let (layout, offset_s) = extend(layout, layout_s);
+ let (layout, offset_union) = extend(layout, layout_union);
+ let offset_f = offset_union;
+ let offset_r = offset_union;
+
+ TaskLayout {
+ layout,
+ offset_t,
+ offset_s,
+ offset_f,
+ offset_r,
+ }
+ }
+
+ /// Wakes a waker.
+ unsafe fn wake(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task is completed or closed, it can't be woken.
+ if state & (COMPLETED | CLOSED) != 0 {
+ // Drop the waker.
+ Self::decrement(ptr);
+ break;
+ }
+
+ // If the task is already scheduled, we just need to synchronize with the thread that
+ // will run the task by "publishing" our current view of the memory.
+ if state & SCHEDULED != 0 {
+ // Update the state without actually modifying it.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ state,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Drop the waker.
+ Self::decrement(ptr);
+ break;
+ }
+ Err(s) => state = s,
+ }
+ } else {
+ // Mark the task as scheduled.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ state | SCHEDULED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the task is not yet scheduled and isn't currently running, now is the
+ // time to schedule it.
+ if state & (SCHEDULED | RUNNING) == 0 {
+ // Schedule the task.
+ let task = Task {
+ raw_task: NonNull::new_unchecked(ptr as *mut ()),
+ _marker: PhantomData,
+ };
+ (*raw.schedule)(task);
+ } else {
+ // Drop the waker.
+ Self::decrement(ptr);
+ }
+
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ /// Wakes a waker by reference.
+ unsafe fn wake_by_ref(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task is completed or closed, it can't be woken.
+ if state & (COMPLETED | CLOSED) != 0 {
+ break;
+ }
+
+ // If the task is already scheduled, we just need to synchronize with the thread that
+ // will run the task by "publishing" our current view of the memory.
+ if state & SCHEDULED != 0 {
+ // Update the state without actually modifying it.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ state,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => break,
+ Err(s) => state = s,
+ }
+ } else {
+ // If the task is not scheduled nor running, we'll need to schedule after waking.
+ let new = if state & (SCHEDULED | RUNNING) == 0 {
+ (state | SCHEDULED) + REFERENCE
+ } else {
+ state | SCHEDULED
+ };
+
+ // Mark the task as scheduled.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the task is not scheduled nor running, now is the time to schedule.
+ if state & (SCHEDULED | RUNNING) == 0 {
+ // If the reference count overflowed, abort.
+ if state > isize::max_value() as usize {
+ std::process::abort();
+ }
+
+ // Schedule the task.
+ let task = Task {
+ raw_task: NonNull::new_unchecked(ptr as *mut ()),
+ _marker: PhantomData,
+ };
+ (*raw.schedule)(task);
+ }
+
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ /// Clones a waker.
+ unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
+ let raw = Self::from_ptr(ptr);
+ let raw_waker = &(*raw.header).vtable.raw_waker;
+
+ // Increment the reference count. With any kind of reference-counted data structure,
+ // relaxed ordering is fine when the reference is being cloned.
+ let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
+
+ // If the reference count overflowed, abort.
+ if state > isize::max_value() as usize {
+ std::process::abort();
+ }
+
+ RawWaker::new(ptr, raw_waker)
+ }
+
+ /// Drops a waker or a task.
+ ///
+ /// This function will decrement the reference count. If it drops down to zero and the
+ /// associated join handle has been dropped too, then the task gets destroyed.
+ #[inline]
+ unsafe fn decrement(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // Decrement the reference count.
+ let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
+
+ // If this was the last reference to the task and the `JoinHandle` has been dropped as
+ // well, then destroy task.
+ if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
+ Self::destroy(ptr);
+ }
+ }
+
+ /// Schedules a task for running.
+ ///
+ /// This function doesn't modify the state of the task. It only passes the task reference to
+ /// its schedule function.
+ unsafe fn schedule(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ (*raw.schedule)(Task {
+ raw_task: NonNull::new_unchecked(ptr as *mut ()),
+ _marker: PhantomData,
+ });
+ }
+
+ /// Drops the future inside a task.
+ #[inline]
+ unsafe fn drop_future(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // We need a safeguard against panics because the destructor can panic.
+ abort_on_panic(|| {
+ raw.future.drop_in_place();
+ })
+ }
+
+ /// Returns a pointer to the output inside a task.
+ unsafe fn get_output(ptr: *const ()) -> *const () {
+ let raw = Self::from_ptr(ptr);
+ raw.output as *const ()
+ }
+
+ /// Cleans up task's resources and deallocates it.
+ ///
+ /// If the task has not been closed, then its future or the output will be dropped. The
+ /// schedule function and the tag get dropped too.
+ #[inline]
+ unsafe fn destroy(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+ let task_layout = Self::task_layout();
+
+ // We need a safeguard against panics because destructors can panic.
+ abort_on_panic(|| {
+ // Drop the schedule function.
+ (raw.schedule as *mut S).drop_in_place();
+
+ // Drop the tag.
+ (raw.tag as *mut T).drop_in_place();
+ });
+
+ // Finally, deallocate the memory reserved by the task.
+ alloc::dealloc(ptr as *mut u8, task_layout.layout);
+ }
+
+ /// Runs a task.
+ ///
+ /// If polling its future panics, the task will be closed and the panic propagated into the
+ /// caller.
+ unsafe fn run(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // Create a context from the raw task pointer and the vtable inside the its header.
+ let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
+ ptr,
+ &(*raw.header).vtable.raw_waker,
+ )));
+ let cx = &mut Context::from_waker(&waker);
+
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ // Update the task's state before polling its future.
+ loop {
+ // If the task has been closed, drop the task reference and return.
+ if state & CLOSED != 0 {
+ // Notify the awaiter that the task has been closed.
+ if state & AWAITER != 0 {
+ (*raw.header).notify();
+ }
+
+ // Drop the future.
+ Self::drop_future(ptr);
+
+ // Drop the task reference.
+ Self::decrement(ptr);
+ return;
+ }
+
+ // Mark the task as unscheduled and running.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ (state & !SCHEDULED) | RUNNING,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Update the state because we're continuing with polling the future.
+ state = (state & !SCHEDULED) | RUNNING;
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+
+ // Poll the inner future, but surround it with a guard that closes the task in case polling
+ // panics.
+ let guard = Guard(raw);
+ let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
+ mem::forget(guard);
+
+ match poll {
+ Poll::Ready(out) => {
+ // Replace the future with its output.
+ Self::drop_future(ptr);
+ raw.output.write(out);
+
+ // A place where the output will be stored in case it needs to be dropped.
+ let mut output = None;
+
+ // The task is now completed.
+ loop {
+ // If the handle is dropped, we'll need to close it and drop the output.
+ let new = if state & HANDLE == 0 {
+ (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
+ } else {
+ (state & !RUNNING & !SCHEDULED) | COMPLETED
+ };
+
+ // Mark the task as not running and completed.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the handle is dropped or if the task was closed while running,
+ // now it's time to drop the output.
+ if state & HANDLE == 0 || state & CLOSED != 0 {
+ // Read the output.
+ output = Some(raw.output.read());
+ }
+
+ // Notify the awaiter that the task has been completed.
+ if state & AWAITER != 0 {
+ (*raw.header).notify();
+ }
+
+ // Drop the task reference.
+ Self::decrement(ptr);
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+
+ // Drop the output if it was taken out of the task.
+ drop(output);
+ }
+ Poll::Pending => {
+ // The task is still not completed.
+ loop {
+ // If the task was closed while running, we'll need to unschedule in case it
+ // was woken and then clean up its resources.
+ let new = if state & CLOSED != 0 {
+ state & !RUNNING & !SCHEDULED
+ } else {
+ state & !RUNNING
+ };
+
+ // Mark the task as not running.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(state) => {
+ // If the task was closed while running, we need to drop its future.
+ // If the task was woken while running, we need to schedule it.
+ // Otherwise, we just drop the task reference.
+ if state & CLOSED != 0 {
+ // The thread that closed the task didn't drop the future because
+ // it was running so now it's our responsibility to do so.
+ Self::drop_future(ptr);
+
+ // Drop the task reference.
+ Self::decrement(ptr);
+ } else if state & SCHEDULED != 0 {
+ // The thread that has woken the task didn't reschedule it because
+ // it was running so now it's our responsibility to do so.
+ Self::schedule(ptr);
+ } else {
+ // Drop the task reference.
+ Self::decrement(ptr);
+ }
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ /// A guard that closes the task if polling its future panics.
+ struct Guard<F, R, S, T>(RawTask<F, R, S, T>)
+ where
+ F: Future<Output = R> + Send + 'static,
+ R: Send + 'static,
+ S: Fn(Task<T>) + Send + Sync + 'static,
+ T: Send + 'static;
+
+ impl<F, R, S, T> Drop for Guard<F, R, S, T>
+ where
+ F: Future<Output = R> + Send + 'static,
+ R: Send + 'static,
+ S: Fn(Task<T>) + Send + Sync + 'static,
+ T: Send + 'static,
+ {
+ fn drop(&mut self) {
+ let raw = self.0;
+ let ptr = raw.header as *const ();
+
+ unsafe {
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task was closed while running, then unschedule it, drop its
+ // future, and drop the task reference.
+ if state & CLOSED != 0 {
+ // We still need to unschedule the task because it is possible it was
+ // woken while running.
+ (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
+
+ // The thread that closed the task didn't drop the future because it
+ // was running so now it's our responsibility to do so.
+ RawTask::<F, R, S, T>::drop_future(ptr);
+
+ // Drop the task reference.
+ RawTask::<F, R, S, T>::decrement(ptr);
+ break;
+ }
+
+ // Mark the task as not running, not scheduled, and closed.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ (state & !RUNNING & !SCHEDULED) | CLOSED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(state) => {
+ // Drop the future because the task is now closed.
+ RawTask::<F, R, S, T>::drop_future(ptr);
+
+ // Notify the awaiter that the task has been closed.
+ if state & AWAITER != 0 {
+ (*raw.header).notify();
+ }
+
+ // Drop the task reference.
+ RawTask::<F, R, S, T>::decrement(ptr);
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/src/state.rs b/src/state.rs
new file mode 100644
index 0000000..d6ce34f
--- /dev/null
+++ b/src/state.rs
@@ -0,0 +1,65 @@
+/// Set if the task is scheduled for running.
+///
+/// A task is considered to be scheduled whenever its `Task` reference exists. It is in scheduled
+/// state at the moment of creation and when it gets unapused either by its `JoinHandle` or woken
+/// by a `Waker`.
+///
+/// This flag can't be set when the task is completed. However, it can be set while the task is
+/// running, in which case it will be rescheduled as soon as polling finishes.
+pub(crate) const SCHEDULED: usize = 1 << 0;
+
+/// Set if the task is running.
+///
+/// A task is running state while its future is being polled.
+///
+/// This flag can't be set when the task is completed. However, it can be in scheduled state while
+/// it is running, in which case it will be rescheduled when it stops being polled.
+pub(crate) const RUNNING: usize = 1 << 1;
+
+/// Set if the task has been completed.
+///
+/// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored
+/// inside the task until it becomes stopped. In fact, `JoinHandle` picks the output up by marking
+/// the task as stopped.
+///
+/// This flag can't be set when the task is scheduled or completed.
+pub(crate) const COMPLETED: usize = 1 << 2;
+
+/// Set if the task is closed.
+///
+/// If a task is closed, that means its either cancelled or its output has been consumed by the
+/// `JoinHandle`. A task becomes closed when:
+///
+/// 1. It gets cancelled by `Task::cancel()` or `JoinHandle::cancel()`.
+/// 2. Its output is awaited by the `JoinHandle`.
+/// 3. It panics while polling the future.
+/// 4. It is completed and the `JoinHandle` is dropped.
+pub(crate) const CLOSED: usize = 1 << 3;
+
+/// Set if the `JoinHandle` still exists.
+///
+/// The `JoinHandle` is a special case in that it is only tracked by this flag, while all other
+/// task references (`Task` and `Waker`s) are tracked by the reference count.
+pub(crate) const HANDLE: usize = 1 << 4;
+
+/// Set if the `JoinHandle` is awaiting the output.
+///
+/// This flag is set while there is a registered awaiter of type `Waker` inside the task. When the
+/// task gets closed or completed, we need to wake the awaiter. This flag can be used as a fast
+/// check that tells us if we need to wake anyone without acquiring the lock inside the task.
+pub(crate) const AWAITER: usize = 1 << 5;
+
+/// Set if the awaiter is locked.
+///
+/// This lock is acquired before a new awaiter is registered or the existing one is woken.
+pub(crate) const LOCKED: usize = 1 << 6;
+
+/// A single reference.
+///
+/// The lower bits in the state contain various flags representing the task state, while the upper
+/// bits contain the reference count. The value of `REFERENCE` represents a single reference in the
+/// total reference count.
+///
+/// Note that the reference counter only tracks the `Task` and `Waker`s. The `JoinHandle` is
+/// tracked separately by the `HANDLE` flag.
+pub(crate) const REFERENCE: usize = 1 << 7;
diff --git a/src/task.rs b/src/task.rs
new file mode 100644
index 0000000..8bfc164
--- /dev/null
+++ b/src/task.rs
@@ -0,0 +1,390 @@
+use std::fmt;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::mem;
+use std::ptr::NonNull;
+
+use crate::header::Header;
+use crate::raw::RawTask;
+use crate::JoinHandle;
+
+/// Creates a new task.
+///
+/// This constructor returns a `Task` reference that runs the future and a [`JoinHandle`] that
+/// awaits its result.
+///
+/// The `tag` is stored inside the allocated task.
+///
+/// When run, the task polls `future`. When woken, it gets scheduled for running by the
+/// `schedule` function.
+///
+/// # Examples
+///
+/// ```
+/// # #![feature(async_await)]
+/// use crossbeam::channel;
+///
+/// // The future inside the task.
+/// let future = async {
+/// println!("Hello, world!");
+/// };
+///
+/// // If the task gets woken, it will be sent into this channel.
+/// let (s, r) = channel::unbounded();
+/// let schedule = move |task| s.send(task).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (task, handle) = async_task::spawn(future, schedule, ());
+/// ```
+///
+/// [`JoinHandle`]: struct.JoinHandle.html
+pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
+where
+ F: Future<Output = R> + Send + 'static,
+ R: Send + 'static,
+ S: Fn(Task<T>) + Send + Sync + 'static,
+ T: Send + Sync + 'static,
+{
+ let raw_task = RawTask::<F, R, S, T>::allocate(tag, future, schedule);
+ let task = Task {
+ raw_task,
+ _marker: PhantomData,
+ };
+ let handle = JoinHandle {
+ raw_task,
+ _marker: PhantomData,
+ };
+ (task, handle)
+}
+
+/// A task that runs a future.
+///
+/// # Construction
+///
+/// A task is a heap-allocated structure containing:
+///
+/// * A reference counter.
+/// * The state of the task.
+/// * Arbitrary piece of data called a *tag*.
+/// * A function that schedules the task when woken.
+/// * A future or its result if polling has completed.
+///
+/// Constructor [`Task::create()`] returns a [`Task`] and a [`JoinHandle`]. Those two references
+/// are like two sides of the task: one runs the future and the other awaits its result.
+///
+/// # Behavior
+///
+/// The [`Task`] reference "owns" the task itself and is used to [run] it. Running consumes the
+/// [`Task`] reference and polls its internal future. If the future is still pending after being
+/// polled, the [`Task`] reference will be recreated when woken by a [`Waker`]. If the future
+/// completes, its result becomes available to the [`JoinHandle`].
+///
+/// The [`JoinHandle`] is a [`Future`] that awaits the result of the task.
+///
+/// When the task is woken, its [`Task`] reference is recreated and passed to the schedule function
+/// provided during construction. In most executors, scheduling simply pushes the [`Task`] into a
+/// queue of runnable tasks.
+///
+/// If the [`Task`] reference is dropped without being run, the task is cancelled.
+///
+/// Both [`Task`] and [`JoinHandle`] have methods that cancel the task. When cancelled, the task
+/// won't be scheduled again even if a [`Waker`] wakes it or the [`JoinHandle`] is polled. An
+/// attempt to run a cancelled task won't do anything. And if the cancelled task has already
+/// completed, awaiting its result through [`JoinHandle`] will return `None`.
+///
+/// If polling the task's future panics, it gets cancelled automatically.
+///
+/// # Task states
+///
+/// A task can be in the following states:
+///
+/// * Sleeping: The [`Task`] reference doesn't exist and is waiting to be scheduled by a [`Waker`].
+/// * Scheduled: The [`Task`] reference exists and is waiting to be [run].
+/// * Completed: The [`Task`] reference doesn't exist anymore and can't be rescheduled, but its
+/// result is available to the [`JoinHandle`].
+/// * Cancelled: The [`Task`] reference may or may not exist, but running it does nothing and
+/// awaiting the [`JoinHandle`] returns `None`.
+///
+/// When constructed, the task is initially in the scheduled state.
+///
+/// # Destruction
+///
+/// The future inside the task gets dropped in the following cases:
+///
+/// * When [`Task`] is dropped.
+/// * When [`Task`] is run to completion.
+///
+/// If the future hasn't been dropped and the last [`Waker`] or [`JoinHandle`] is dropped, or if
+/// a [`JoinHandle`] cancels the task, then the task will be scheduled one last time so that its
+/// future gets dropped by the executor. In other words, the task's future can be dropped only by
+/// [`Task`].
+///
+/// When the task completes, the result of its future is stored inside the allocation. This result
+/// is taken out when the [`JoinHandle`] awaits it. When the task is cancelled or the
+/// [`JoinHandle`] is dropped without being awaited, the result gets dropped too.
+///
+/// The task gets deallocated when all references to it are dropped, which includes the [`Task`],
+/// the [`JoinHandle`], and any associated [`Waker`]s.
+///
+/// The tag inside the task and the schedule function get dropped at the time of deallocation.
+///
+/// # Panics
+///
+/// If polling the inner future inside [`run()`] panics, the panic will be propagated into
+/// the caller. Likewise, a panic inside the task result's destructor will be propagated. All other
+/// panics result in the process being aborted.
+///
+/// More precisely, the process is aborted if a panic occurs:
+///
+/// * Inside the schedule function.
+/// * While dropping the tag.
+/// * While dropping the future.
+/// * While dropping the schedule function.
+/// * While waking the task awaiting the [`JoinHandle`].
+///
+/// [`run()`]: struct.Task.html#method.run
+/// [run]: struct.Task.html#method.run
+/// [`JoinHandle`]: struct.JoinHandle.html
+/// [`Task`]: struct.Task.html
+/// [`Task::create()`]: struct.Task.html#method.create
+/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html
+/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
+///
+/// # Examples
+///
+/// ```
+/// # #![feature(async_await)]
+/// use async_task::Task;
+/// use crossbeam::channel;
+/// use futures::executor;
+///
+/// // The future inside the task.
+/// let future = async {
+/// println!("Hello, world!");
+/// };
+///
+/// // If the task gets woken, it will be sent into this channel.
+/// let (s, r) = channel::unbounded();
+/// let schedule = move |task| s.send(task).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (task, handle) = async_task::spawn(future, schedule, ());
+///
+/// // Run the task. In this example, it will complete after a single run.
+/// task.run();
+/// assert!(r.is_empty());
+///
+/// // Await its result.
+/// executor::block_on(handle);
+/// ```
+pub struct Task<T> {
+ /// A pointer to the heap-allocated task.
+ pub(crate) raw_task: NonNull<()>,
+
+ /// A marker capturing the generic type `T`.
+ pub(crate) _marker: PhantomData<T>,
+}
+
+unsafe impl<T> Send for Task<T> {}
+unsafe impl<T> Sync for Task<T> {}
+
+impl<T> Task<T> {
+ /// Schedules the task.
+ ///
+ /// This is a convenience method that simply reschedules the task by passing it to its schedule
+ /// function.
+ ///
+ /// If the task is cancelled, this method won't do anything.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #![feature(async_await)]
+ /// use crossbeam::channel;
+ ///
+ /// // The future inside the task.
+ /// let future = async {
+ /// println!("Hello, world!");
+ /// };
+ ///
+ /// // If the task gets woken, it will be sent into this channel.
+ /// let (s, r) = channel::unbounded();
+ /// let schedule = move |task| s.send(task).unwrap();
+ ///
+ /// // Create a task with the future and the schedule function.
+ /// let (task, handle) = async_task::spawn(future, schedule, ());
+ ///
+ /// // Send the task into the channel.
+ /// task.schedule();
+ ///
+ /// // Retrieve the task back from the channel.
+ /// let task = r.recv().unwrap();
+ /// ```
+ pub fn schedule(self) {
+ let ptr = self.raw_task.as_ptr();
+ let header = ptr as *const Header;
+ mem::forget(self);
+
+ unsafe {
+ ((*header).vtable.schedule)(ptr);
+ }
+ }
+
+ /// Runs the task.
+ ///
+ /// This method polls the task's future. If the future completes, its result will become
+ /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
+ /// be woken in order to be rescheduled and then run again.
+ ///
+ /// If the task is cancelled, running it won't do anything.
+ ///
+ /// # Panics
+ ///
+ /// It is possible that polling the future panics, in which case the panic will be propagated
+ /// into the caller. It is advised that invocations of this method are wrapped inside
+ /// [`catch_unwind`].
+ ///
+ /// If a panic occurs, the task is automatically cancelled.
+ ///
+ /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #![feature(async_await)]
+ /// use crossbeam::channel;
+ /// use futures::executor;
+ ///
+ /// // The future inside the task.
+ /// let future = async { 1 + 2 };
+ ///
+ /// // If the task gets woken, it will be sent into this channel.
+ /// let (s, r) = channel::unbounded();
+ /// let schedule = move |task| s.send(task).unwrap();
+ ///
+ /// // Create a task with the future and the schedule function.
+ /// let (task, handle) = async_task::spawn(future, schedule, ());
+ ///
+ /// // Run the task. In this example, it will complete after a single run.
+ /// task.run();
+ /// assert!(r.is_empty());
+ ///
+ /// // Await the result of the task.
+ /// let result = executor::block_on(handle);
+ /// assert_eq!(result, Some(3));
+ /// ```
+ pub fn run(self) {
+ let ptr = self.raw_task.as_ptr();
+ let header = ptr as *const Header;
+ mem::forget(self);
+
+ unsafe {
+ ((*header).vtable.run)(ptr);
+ }
+ }
+
+ /// Cancels the task.
+ ///
+ /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
+ /// to run it won't do anything. And if it's completed, awaiting its result evaluates to
+ /// `None`.
+ ///
+ /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #![feature(async_await)]
+ /// use crossbeam::channel;
+ /// use futures::executor;
+ ///
+ /// // The future inside the task.
+ /// let future = async { 1 + 2 };
+ ///
+ /// // If the task gets woken, it will be sent into this channel.
+ /// let (s, r) = channel::unbounded();
+ /// let schedule = move |task| s.send(task).unwrap();
+ ///
+ /// // Create a task with the future and the schedule function.
+ /// let (task, handle) = async_task::spawn(future, schedule, ());
+ ///
+ /// // Cancel the task.
+ /// task.cancel();
+ ///
+ /// // Running a cancelled task does nothing.
+ /// task.run();
+ ///
+ /// // Await the result of the task.
+ /// let result = executor::block_on(handle);
+ /// assert_eq!(result, None);
+ /// ```
+ pub fn cancel(&self) {
+ let ptr = self.raw_task.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ (*header).cancel();
+ }
+ }
+
+ /// Returns a reference to the tag stored inside the task.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # #![feature(async_await)]
+ /// use crossbeam::channel;
+ ///
+ /// // The future inside the task.
+ /// let future = async { 1 + 2 };
+ ///
+ /// // If the task gets woken, it will be sent into this channel.
+ /// let (s, r) = channel::unbounded();
+ /// let schedule = move |task| s.send(task).unwrap();
+ ///
+ /// // Create a task with the future and the schedule function.
+ /// let (task, handle) = async_task::spawn(future, schedule, "a simple task");
+ ///
+ /// // Access the tag.
+ /// assert_eq!(*task.tag(), "a simple task");
+ /// ```
+ pub fn tag(&self) -> &T {
+ let offset = Header::offset_tag::<T>();
+ let ptr = self.raw_task.as_ptr();
+
+ unsafe {
+ let raw = (ptr as *mut u8).add(offset) as *const T;
+ &*raw
+ }
+ }
+}
+
+impl<T> Drop for Task<T> {
+ fn drop(&mut self) {
+ let ptr = self.raw_task.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ // Cancel the task.
+ (*header).cancel();
+
+ // Drop the future.
+ ((*header).vtable.drop_future)(ptr);
+
+ // Drop the task reference.
+ ((*header).vtable.decrement)(ptr);
+ }
+ }
+}
+
+impl<T: fmt::Debug> fmt::Debug for Task<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let ptr = self.raw_task.as_ptr();
+ let header = ptr as *const Header;
+
+ f.debug_struct("Task")
+ .field("header", unsafe { &(*header) })
+ .field("tag", self.tag())
+ .finish()
+ }
+}
diff --git a/src/utils.rs b/src/utils.rs
new file mode 100644
index 0000000..441ead1
--- /dev/null
+++ b/src/utils.rs
@@ -0,0 +1,48 @@
+use std::alloc::Layout;
+use std::mem;
+
+/// Calls a function and aborts if it panics.
+///
+/// This is useful in unsafe code where we can't recover from panics.
+#[inline]
+pub(crate) fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
+ struct Bomb;
+
+ impl Drop for Bomb {
+ fn drop(&mut self) {
+ std::process::abort();
+ }
+ }
+
+ let bomb = Bomb;
+ let t = f();
+ mem::forget(bomb);
+ t
+}
+
+/// Returns the layout for `a` followed by `b` and the offset of `b`.
+///
+/// This function was adapted from the currently unstable `Layout::extend()`:
+/// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.extend
+#[inline]
+pub(crate) fn extend(a: Layout, b: Layout) -> (Layout, usize) {
+ let new_align = a.align().max(b.align());
+ let pad = padding_needed_for(a, b.align());
+
+ let offset = a.size().checked_add(pad).unwrap();
+ let new_size = offset.checked_add(b.size()).unwrap();
+
+ let layout = Layout::from_size_align(new_size, new_align).unwrap();
+ (layout, offset)
+}
+
+/// Returns the padding after `layout` that aligns the following address to `align`.
+///
+/// This function was adapted from the currently unstable `Layout::padding_needed_for()`:
+/// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.padding_needed_for
+#[inline]
+pub(crate) fn padding_needed_for(layout: Layout, align: usize) -> usize {
+ let len = layout.size();
+ let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1);
+ len_rounded_up.wrapping_sub(len)
+}