aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStjepan Glavina <stjepang@gmail.com>2019-08-20 15:29:43 +0200
committerStjepan Glavina <stjepang@gmail.com>2019-11-15 20:55:26 +0000
commit5c398cfa74092510d061148ef01ce1a3ded85916 (patch)
tree70c33a84112d9be7cf0bdcbf9fdedb21cb752551 /src
parent8b8c8a15d479a46bd278d75e00a2d8a71802f7a8 (diff)
downloadasync-task-5c398cfa74092510d061148ef01ce1a3ded85916.tar.gz
Reschedule if last waker is dropped
Diffstat (limited to 'src')
-rw-r--r--src/header.rs16
-rw-r--r--src/join_handle.rs12
-rw-r--r--src/lib.rs8
-rw-r--r--src/raw.rs95
-rw-r--r--src/state.rs25
-rw-r--r--src/task.rs32
6 files changed, 107 insertions, 81 deletions
diff --git a/src/header.rs b/src/header.rs
index 0ce5164..f9aff4f 100644
--- a/src/header.rs
+++ b/src/header.rs
@@ -21,7 +21,7 @@ pub(crate) struct Header {
/// The task that is blocked on the `JoinHandle`.
///
- /// This waker needs to be woken once the task completes or is closed.
+ /// This waker needs to be woken up once the task completes or is closed.
pub(crate) awaiter: Cell<Option<Waker>>,
/// The virtual table.
@@ -34,8 +34,8 @@ pub(crate) struct Header {
impl Header {
/// Cancels the task.
///
- /// This method will only mark the task as closed and will notify the awaiter, but it won't
- /// reschedule the task if it's not completed.
+ /// This method will mark the task as closed and notify the awaiter, but it won't reschedule
+ /// the task if it's not completed.
pub(crate) fn cancel(&self) {
let mut state = self.state.load(Ordering::Acquire);
@@ -65,9 +65,9 @@ impl Header {
}
}
- /// Notifies the task blocked on the task.
+ /// Notifies the awaiter blocked on this task.
///
- /// If there is a registered waker, it will be removed from the header and woken.
+ /// If there is a registered waker, it will be removed from the header and woken up.
#[inline]
pub(crate) fn notify(&self) {
if let Some(waker) = self.swap_awaiter(None) {
@@ -78,9 +78,9 @@ impl Header {
}
}
- /// Notifies the task blocked on the task unless its waker matches `current`.
+ /// Notifies the awaiter blocked on this task, unless its waker matches `current`.
///
- /// If there is a registered waker, it will be removed from the header.
+ /// If there is a registered waker, it will be removed from the header in any case.
#[inline]
pub(crate) fn notify_unless(&self, current: &Waker) {
if let Some(waker) = self.swap_awaiter(None) {
@@ -93,7 +93,7 @@ impl Header {
}
}
- /// Swaps the awaiter and returns the previous value.
+ /// Swaps the awaiter for a new waker and returns the previous value.
#[inline]
pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> {
let new_is_none = new.is_none();
diff --git a/src/join_handle.rs b/src/join_handle.rs
index d4fed50..bd9366f 100644
--- a/src/join_handle.rs
+++ b/src/join_handle.rs
@@ -14,13 +14,13 @@ use crate::utils::abort_on_panic;
///
/// This type is a future that resolves to an `Option<R>` where:
///
-/// * `None` indicates the task has panicked or was cancelled
-/// * `Some(res)` indicates the task has completed with `res`
+/// * `None` indicates the task has panicked or was cancelled.
+/// * `Some(result)` indicates the task has completed with `result` of type `R`.
pub struct JoinHandle<R, T> {
/// A raw task pointer.
pub(crate) raw_task: NonNull<()>,
- /// A marker capturing the generic type `R`.
+ /// A marker capturing generic types `R` and `T`.
pub(crate) _marker: PhantomData<(R, T)>,
}
@@ -34,7 +34,7 @@ impl<R, T> JoinHandle<R, T> {
///
/// If the task has already completed, calling this method will have no effect.
///
- /// When a task is cancelled, its future cannot be polled again and will be dropped instead.
+ /// When a task is cancelled, its future will not be polled again.
pub fn cancel(&self) {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
@@ -63,8 +63,8 @@ impl<R, T> JoinHandle<R, T> {
Ordering::Acquire,
) {
Ok(_) => {
- // If the task is not scheduled nor running, schedule it so that its future
- // gets dropped by the executor.
+ // If the task is not scheduled nor running, schedule it one more time so
+ // that its future gets dropped by the executor.
if state & (SCHEDULED | RUNNING) == 0 {
((*header).vtable.schedule)(ptr);
}
diff --git a/src/lib.rs b/src/lib.rs
index 153cb43..3f61ea4 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -77,8 +77,8 @@
//!
//! # Cancellation
//!
-//! Both [`Task`] and [`JoinHandle`] have a method that cancels the task. When cancelled, the
-//! task's future will not be polled again and will get dropped instead.
+//! Both [`Task`] and [`JoinHandle`] have methods that cancel the task. When cancelled, the task's
+//! future will not be polled again and will get dropped instead.
//!
//! If cancelled by the [`Task`] instance, the task is destroyed immediately. If cancelled by the
//! [`JoinHandle`] instance, it will be scheduled one more time and the next attempt to run it will
@@ -86,8 +86,8 @@
//!
//! # Performance
//!
-//! Task construction incurs a single allocation only that holds its state, the schedule function,
-//! and the future or the result of the future if completed.
+//! Task construction incurs a single allocation that holds its state, the schedule function, and
+//! the future or the result of the future if completed.
//!
//! The layout of a task is equivalent to 4 words followed by the schedule function, and then by a
//! union of the future and its output.
diff --git a/src/raw.rs b/src/raw.rs
index c1e5b3a..3b993a3 100644
--- a/src/raw.rs
+++ b/src/raw.rs
@@ -27,8 +27,8 @@ pub(crate) struct TaskVTable {
/// Returns a pointer to the output stored after completion.
pub(crate) get_output: unsafe fn(*const ()) -> *const (),
- /// Drops a waker or a task.
- pub(crate) decrement: unsafe fn(ptr: *const ()),
+ /// Drops the task.
+ pub(crate) drop_task: unsafe fn(ptr: *const ()),
/// Destroys the task.
pub(crate) destroy: unsafe fn(*const ()),
@@ -39,7 +39,7 @@ pub(crate) struct TaskVTable {
/// Memory layout of a task.
///
-/// This struct contains the information on:
+/// This struct contains the following information:
///
/// 1. How to allocate and deallocate the task.
/// 2. How to access the fields inside the task.
@@ -61,7 +61,7 @@ pub(crate) struct TaskLayout {
pub(crate) offset_r: usize,
}
-/// Raw pointers to the fields of a task.
+/// Raw pointers to the fields inside a task.
pub(crate) struct RawTask<F, R, S, T> {
/// The task header.
pub(crate) header: *const Header,
@@ -102,7 +102,7 @@ where
{
/// Allocates a task with the given `future` and `schedule` function.
///
- /// It is assumed there are initially only the `Task` reference and the `JoinHandle`.
+ /// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> {
// Compute the layout of the task for allocation. Abort if the computation fails.
let task_layout = abort_on_panic(|| Self::task_layout());
@@ -125,12 +125,12 @@ where
Self::clone_waker,
Self::wake,
Self::wake_by_ref,
- Self::decrement,
+ Self::drop_waker,
),
schedule: Self::schedule,
drop_future: Self::drop_future,
get_output: Self::get_output,
- decrement: Self::decrement,
+ drop_task: Self::drop_task,
destroy: Self::destroy,
run: Self::run,
},
@@ -181,7 +181,7 @@ where
let align_union = layout_f.align().max(layout_r.align());
let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
- // Compute the layout for `Header` followed by `T`, then `S`, then `union { F, R }`.
+ // Compute the layout for `Header` followed by `T`, then `S`, and finally `union { F, R }`.
let layout = layout_header;
let (layout, offset_t) = extend(layout, layout_t);
let (layout, offset_s) = extend(layout, layout_s);
@@ -205,10 +205,10 @@ where
let mut state = (*raw.header).state.load(Ordering::Acquire);
loop {
- // If the task is completed or closed, it can't be woken.
+ // If the task is completed or closed, it can't be woken up.
if state & (COMPLETED | CLOSED) != 0 {
// Drop the waker.
- Self::decrement(ptr);
+ Self::drop_waker(ptr);
break;
}
@@ -224,7 +224,7 @@ where
) {
Ok(_) => {
// Drop the waker.
- Self::decrement(ptr);
+ Self::drop_waker(ptr);
break;
}
Err(s) => state = s,
@@ -249,7 +249,7 @@ where
(*raw.schedule)(task);
} else {
// Drop the waker.
- Self::decrement(ptr);
+ Self::drop_waker(ptr);
}
break;
@@ -267,7 +267,7 @@ where
let mut state = (*raw.header).state.load(Ordering::Acquire);
loop {
- // If the task is completed or closed, it can't be woken.
+ // If the task is completed or closed, it can't be woken up.
if state & (COMPLETED | CLOSED) != 0 {
break;
}
@@ -330,7 +330,7 @@ where
let raw_waker = &(*raw.header).vtable.raw_waker;
// Increment the reference count. With any kind of reference-counted data structure,
- // relaxed ordering is fine when the reference is being cloned.
+ // relaxed ordering is appropriate when incrementing the counter.
let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
// If the reference count overflowed, abort.
@@ -341,19 +341,48 @@ where
RawWaker::new(ptr, raw_waker)
}
- /// Drops a waker or a task.
+ /// Drops a waker.
+ ///
+ /// This function will decrement the reference count. If it drops down to zero, the associated
+ /// join handle has been dropped too, and the task has not been completed, then it will get
+ /// scheduled one more time so that its future gets dropped by the executor.
+ #[inline]
+ unsafe fn drop_waker(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // Decrement the reference count.
+ let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
+
+ // If this was the last reference to the task and the `JoinHandle` has been dropped too,
+ // then we need to decide how to destroy the task.
+ if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
+ if new & (COMPLETED | CLOSED) == 0 {
+ // If the task was not completed nor closed, close it and schedule one more time so
+ // that its future gets dropped by the executor.
+ (*raw.header)
+ .state
+ .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
+ ((*raw.header).vtable.schedule)(ptr);
+ } else {
+ // Otherwise, destroy the task right away.
+ Self::destroy(ptr);
+ }
+ }
+ }
+
+ /// Drops a task.
///
/// This function will decrement the reference count. If it drops down to zero and the
/// associated join handle has been dropped too, then the task gets destroyed.
#[inline]
- unsafe fn decrement(ptr: *const ()) {
+ unsafe fn drop_task(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
// Decrement the reference count.
let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
- // If this was the last reference to the task and the `JoinHandle` has been dropped as
- // well, then destroy the task.
+ // If this was the last reference to the task and the `JoinHandle` has been dropped too,
+ // then destroy the task.
if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
Self::destroy(ptr);
}
@@ -391,8 +420,8 @@ where
/// Cleans up task's resources and deallocates it.
///
- /// If the task has not been closed, then its future or the output will be dropped. The
- /// schedule function and the tag get dropped too.
+ /// The schedule function and the tag will be dropped, and the task will then get deallocated.
+ /// The task must be closed before this function is called.
#[inline]
unsafe fn destroy(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
@@ -413,8 +442,8 @@ where
/// Runs a task.
///
- /// If polling its future panics, the task will be closed and the panic propagated into the
- /// caller.
+ /// If polling its future panics, the task will be closed and the panic will be propagated into
+ /// the caller.
unsafe fn run(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
@@ -429,7 +458,7 @@ where
// Update the task's state before polling its future.
loop {
- // If the task has been closed, drop the task reference and return.
+ // If the task has already been closed, drop the task reference and return.
if state & CLOSED != 0 {
// Notify the awaiter that the task has been closed.
if state & AWAITER != 0 {
@@ -440,7 +469,7 @@ where
Self::drop_future(ptr);
// Drop the task reference.
- Self::decrement(ptr);
+ Self::drop_task(ptr);
return;
}
@@ -505,7 +534,7 @@ where
}
// Drop the task reference.
- Self::decrement(ptr);
+ Self::drop_task(ptr);
break;
}
Err(s) => state = s,
@@ -519,7 +548,7 @@ where
// The task is still not completed.
loop {
// If the task was closed while running, we'll need to unschedule in case it
- // was woken and then clean up its resources.
+ // was woken up and then destroy it.
let new = if state & CLOSED != 0 {
state & !RUNNING & !SCHEDULED
} else {
@@ -535,7 +564,7 @@ where
) {
Ok(state) => {
// If the task was closed while running, we need to drop its future.
- // If the task was woken while running, we need to schedule it.
+ // If the task was woken up while running, we need to schedule it.
// Otherwise, we just drop the task reference.
if state & CLOSED != 0 {
// The thread that closed the task didn't drop the future because
@@ -543,14 +572,14 @@ where
Self::drop_future(ptr);
// Drop the task reference.
- Self::decrement(ptr);
+ Self::drop_task(ptr);
} else if state & SCHEDULED != 0 {
- // The thread that has woken the task didn't reschedule it because
+ // The thread that woke the task up didn't reschedule it because
// it was running so now it's our responsibility to do so.
Self::schedule(ptr);
} else {
// Drop the task reference.
- Self::decrement(ptr);
+ Self::drop_task(ptr);
}
break;
}
@@ -587,7 +616,7 @@ where
// future, and drop the task reference.
if state & CLOSED != 0 {
// We still need to unschedule the task because it is possible it was
- // woken while running.
+ // woken up while running.
(*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
// The thread that closed the task didn't drop the future because it
@@ -595,7 +624,7 @@ where
RawTask::<F, R, S, T>::drop_future(ptr);
// Drop the task reference.
- RawTask::<F, R, S, T>::decrement(ptr);
+ RawTask::<F, R, S, T>::drop_task(ptr);
break;
}
@@ -616,7 +645,7 @@ where
}
// Drop the task reference.
- RawTask::<F, R, S, T>::decrement(ptr);
+ RawTask::<F, R, S, T>::drop_task(ptr);
break;
}
Err(s) => state = s,
diff --git a/src/state.rs b/src/state.rs
index d6ce34f..c03fea3 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -1,8 +1,7 @@
/// Set if the task is scheduled for running.
///
-/// A task is considered to be scheduled whenever its `Task` reference exists. It is in scheduled
-/// state at the moment of creation and when it gets unapused either by its `JoinHandle` or woken
-/// by a `Waker`.
+/// A task is considered to be scheduled whenever its `Task` reference exists. It therefore also
+/// begins in scheduled state at the moment of creation.
///
/// This flag can't be set when the task is completed. However, it can be set while the task is
/// running, in which case it will be rescheduled as soon as polling finishes.
@@ -10,30 +9,30 @@ pub(crate) const SCHEDULED: usize = 1 << 0;
/// Set if the task is running.
///
-/// A task is running state while its future is being polled.
+/// A task is in running state while its future is being polled.
///
/// This flag can't be set when the task is completed. However, it can be in scheduled state while
-/// it is running, in which case it will be rescheduled when it stops being polled.
+/// it is running, in which case it will be rescheduled as soon as polling finishes.
pub(crate) const RUNNING: usize = 1 << 1;
/// Set if the task has been completed.
///
/// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored
-/// inside the task until it becomes stopped. In fact, `JoinHandle` picks the output up by marking
-/// the task as stopped.
+/// inside the task until it becomes closed. In fact, `JoinHandle` picks up the output by marking
+/// the task as closed.
///
-/// This flag can't be set when the task is scheduled or completed.
+/// This flag can't be set when the task is scheduled or running.
pub(crate) const COMPLETED: usize = 1 << 2;
/// Set if the task is closed.
///
-/// If a task is closed, that means its either cancelled or its output has been consumed by the
+/// If a task is closed, that means it's either cancelled or its output has been consumed by the
/// `JoinHandle`. A task becomes closed when:
///
-/// 1. It gets cancelled by `Task::cancel()` or `JoinHandle::cancel()`.
-/// 2. Its output is awaited by the `JoinHandle`.
+/// 1. It gets cancelled by `Task::cancel()`, `Task::drop()`, or `JoinHandle::cancel()`.
+/// 2. Its output gets awaited by the `JoinHandle`.
/// 3. It panics while polling the future.
-/// 4. It is completed and the `JoinHandle` is dropped.
+/// 4. It is completed and the `JoinHandle` gets dropped.
pub(crate) const CLOSED: usize = 1 << 3;
/// Set if the `JoinHandle` still exists.
@@ -51,7 +50,7 @@ pub(crate) const AWAITER: usize = 1 << 5;
/// Set if the awaiter is locked.
///
-/// This lock is acquired before a new awaiter is registered or the existing one is woken.
+/// This lock is acquired before a new awaiter is registered or the existing one is woken up.
pub(crate) const LOCKED: usize = 1 << 6;
/// A single reference.
diff --git a/src/task.rs b/src/task.rs
index 9097f44..42a4024 100644
--- a/src/task.rs
+++ b/src/task.rs
@@ -13,8 +13,8 @@ use crate::JoinHandle;
/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
/// awaits its result.
///
-/// When run, the task polls `future`. When woken, it gets scheduled for running by the `schedule`
-/// function. Argument `tag` is an arbitrary piece of data stored inside the task.
+/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
+/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
///
/// [`Task`]: struct.Task.html
/// [`JoinHandle`]: struct.JoinHandle.html
@@ -29,7 +29,7 @@ use crate::JoinHandle;
/// println!("Hello, world!");
/// };
///
-/// // If the task gets woken, it will be sent into this channel.
+/// // If the task gets woken up, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
@@ -57,19 +57,19 @@ where
/// A task reference that runs its future.
///
-/// The [`Task`] reference "owns" the task itself and is able to run it. Running consumes the
-/// [`Task`] reference and polls its internal future. If the future is still pending after getting
-/// polled, the [`Task`] reference simply won't exist until a [`Waker`] notifies the task. If the
-/// future completes, its result becomes available to the [`JoinHandle`].
+/// At any moment in time, there is at most one [`Task`] reference associated with a particular
+/// task. Running consumes the [`Task`] reference and polls its internal future. If the future is
+/// still pending after getting polled, the [`Task`] reference simply won't exist until a [`Waker`]
+/// notifies the task. If the future completes, its result becomes available to the [`JoinHandle`].
///
-/// When the task is woken, the [`Task`] reference is recreated and passed to the schedule
+/// When a task is woken up, its [`Task`] reference is recreated and passed to the schedule
/// function. In most executors, scheduling simply pushes the [`Task`] reference into a queue of
/// runnable tasks.
///
-/// If the [`Task`] reference is dropped without being run, the task is cancelled. When cancelled,
-/// the task won't be scheduled again even if a [`Waker`] wakes it. It is possible for the
-/// [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt to run
-/// the task won't do anything.
+/// If the [`Task`] reference is dropped without getting run, the task is automatically cancelled.
+/// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. It is possible
+/// for the [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt
+/// to run the task won't do anything.
///
/// [`run()`]: struct.Task.html#method.run
/// [`JoinHandle`]: struct.JoinHandle.html
@@ -107,16 +107,14 @@ impl<T> Task<T> {
///
/// This method polls the task's future. If the future completes, its result will become
/// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
- /// be woken in order to be rescheduled and then run again.
+ /// be woken up in order to be rescheduled and run again.
///
/// If the task was cancelled by a [`JoinHandle`] before it gets run, then this method won't do
/// anything.
///
/// It is possible that polling the future panics, in which case the panic will be propagated
/// into the caller. It is advised that invocations of this method are wrapped inside
- /// [`catch_unwind`].
- ///
- /// If a panic occurs, the task is automatically cancelled.
+ /// [`catch_unwind`]. If a panic occurs, the task is automatically cancelled.
///
/// [`JoinHandle`]: struct.JoinHandle.html
/// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
@@ -170,7 +168,7 @@ impl<T> Drop for Task<T> {
((*header).vtable.drop_future)(ptr);
// Drop the task reference.
- ((*header).vtable.decrement)(ptr);
+ ((*header).vtable.drop_task)(ptr);
}
}
}