diff options
author | Stjepan Glavina <stjepang@gmail.com> | 2019-08-20 15:29:43 +0200 |
---|---|---|
committer | Stjepan Glavina <stjepang@gmail.com> | 2019-11-15 20:55:26 +0000 |
commit | 5c398cfa74092510d061148ef01ce1a3ded85916 (patch) | |
tree | 70c33a84112d9be7cf0bdcbf9fdedb21cb752551 /src | |
parent | 8b8c8a15d479a46bd278d75e00a2d8a71802f7a8 (diff) | |
download | async-task-5c398cfa74092510d061148ef01ce1a3ded85916.tar.gz |
Reschedule if last waker is dropped
Diffstat (limited to 'src')
-rw-r--r-- | src/header.rs | 16 | ||||
-rw-r--r-- | src/join_handle.rs | 12 | ||||
-rw-r--r-- | src/lib.rs | 8 | ||||
-rw-r--r-- | src/raw.rs | 95 | ||||
-rw-r--r-- | src/state.rs | 25 | ||||
-rw-r--r-- | src/task.rs | 32 |
6 files changed, 107 insertions, 81 deletions
diff --git a/src/header.rs b/src/header.rs index 0ce5164..f9aff4f 100644 --- a/src/header.rs +++ b/src/header.rs @@ -21,7 +21,7 @@ pub(crate) struct Header { /// The task that is blocked on the `JoinHandle`. /// - /// This waker needs to be woken once the task completes or is closed. + /// This waker needs to be woken up once the task completes or is closed. pub(crate) awaiter: Cell<Option<Waker>>, /// The virtual table. @@ -34,8 +34,8 @@ pub(crate) struct Header { impl Header { /// Cancels the task. /// - /// This method will only mark the task as closed and will notify the awaiter, but it won't - /// reschedule the task if it's not completed. + /// This method will mark the task as closed and notify the awaiter, but it won't reschedule + /// the task if it's not completed. pub(crate) fn cancel(&self) { let mut state = self.state.load(Ordering::Acquire); @@ -65,9 +65,9 @@ impl Header { } } - /// Notifies the task blocked on the task. + /// Notifies the awaiter blocked on this task. /// - /// If there is a registered waker, it will be removed from the header and woken. + /// If there is a registered waker, it will be removed from the header and woken up. #[inline] pub(crate) fn notify(&self) { if let Some(waker) = self.swap_awaiter(None) { @@ -78,9 +78,9 @@ impl Header { } } - /// Notifies the task blocked on the task unless its waker matches `current`. + /// Notifies the awaiter blocked on this task, unless its waker matches `current`. /// - /// If there is a registered waker, it will be removed from the header. + /// If there is a registered waker, it will be removed from the header in any case. #[inline] pub(crate) fn notify_unless(&self, current: &Waker) { if let Some(waker) = self.swap_awaiter(None) { @@ -93,7 +93,7 @@ impl Header { } } - /// Swaps the awaiter and returns the previous value. + /// Swaps the awaiter for a new waker and returns the previous value. #[inline] pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> { let new_is_none = new.is_none(); diff --git a/src/join_handle.rs b/src/join_handle.rs index d4fed50..bd9366f 100644 --- a/src/join_handle.rs +++ b/src/join_handle.rs @@ -14,13 +14,13 @@ use crate::utils::abort_on_panic; /// /// This type is a future that resolves to an `Option<R>` where: /// -/// * `None` indicates the task has panicked or was cancelled -/// * `Some(res)` indicates the task has completed with `res` +/// * `None` indicates the task has panicked or was cancelled. +/// * `Some(result)` indicates the task has completed with `result` of type `R`. pub struct JoinHandle<R, T> { /// A raw task pointer. pub(crate) raw_task: NonNull<()>, - /// A marker capturing the generic type `R`. + /// A marker capturing generic types `R` and `T`. pub(crate) _marker: PhantomData<(R, T)>, } @@ -34,7 +34,7 @@ impl<R, T> JoinHandle<R, T> { /// /// If the task has already completed, calling this method will have no effect. /// - /// When a task is cancelled, its future cannot be polled again and will be dropped instead. + /// When a task is cancelled, its future will not be polled again. pub fn cancel(&self) { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; @@ -63,8 +63,8 @@ impl<R, T> JoinHandle<R, T> { Ordering::Acquire, ) { Ok(_) => { - // If the task is not scheduled nor running, schedule it so that its future - // gets dropped by the executor. + // If the task is not scheduled nor running, schedule it one more time so + // that its future gets dropped by the executor. if state & (SCHEDULED | RUNNING) == 0 { ((*header).vtable.schedule)(ptr); } @@ -77,8 +77,8 @@ //! //! # Cancellation //! -//! Both [`Task`] and [`JoinHandle`] have a method that cancels the task. When cancelled, the -//! task's future will not be polled again and will get dropped instead. +//! Both [`Task`] and [`JoinHandle`] have methods that cancel the task. When cancelled, the task's +//! future will not be polled again and will get dropped instead. //! //! If cancelled by the [`Task`] instance, the task is destroyed immediately. If cancelled by the //! [`JoinHandle`] instance, it will be scheduled one more time and the next attempt to run it will @@ -86,8 +86,8 @@ //! //! # Performance //! -//! Task construction incurs a single allocation only that holds its state, the schedule function, -//! and the future or the result of the future if completed. +//! Task construction incurs a single allocation that holds its state, the schedule function, and +//! the future or the result of the future if completed. //! //! The layout of a task is equivalent to 4 words followed by the schedule function, and then by a //! union of the future and its output. @@ -27,8 +27,8 @@ pub(crate) struct TaskVTable { /// Returns a pointer to the output stored after completion. pub(crate) get_output: unsafe fn(*const ()) -> *const (), - /// Drops a waker or a task. - pub(crate) decrement: unsafe fn(ptr: *const ()), + /// Drops the task. + pub(crate) drop_task: unsafe fn(ptr: *const ()), /// Destroys the task. pub(crate) destroy: unsafe fn(*const ()), @@ -39,7 +39,7 @@ pub(crate) struct TaskVTable { /// Memory layout of a task. /// -/// This struct contains the information on: +/// This struct contains the following information: /// /// 1. How to allocate and deallocate the task. /// 2. How to access the fields inside the task. @@ -61,7 +61,7 @@ pub(crate) struct TaskLayout { pub(crate) offset_r: usize, } -/// Raw pointers to the fields of a task. +/// Raw pointers to the fields inside a task. pub(crate) struct RawTask<F, R, S, T> { /// The task header. pub(crate) header: *const Header, @@ -102,7 +102,7 @@ where { /// Allocates a task with the given `future` and `schedule` function. /// - /// It is assumed there are initially only the `Task` reference and the `JoinHandle`. + /// It is assumed that initially only the `Task` reference and the `JoinHandle` exist. pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> { // Compute the layout of the task for allocation. Abort if the computation fails. let task_layout = abort_on_panic(|| Self::task_layout()); @@ -125,12 +125,12 @@ where Self::clone_waker, Self::wake, Self::wake_by_ref, - Self::decrement, + Self::drop_waker, ), schedule: Self::schedule, drop_future: Self::drop_future, get_output: Self::get_output, - decrement: Self::decrement, + drop_task: Self::drop_task, destroy: Self::destroy, run: Self::run, }, @@ -181,7 +181,7 @@ where let align_union = layout_f.align().max(layout_r.align()); let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) }; - // Compute the layout for `Header` followed by `T`, then `S`, then `union { F, R }`. + // Compute the layout for `Header` followed by `T`, then `S`, and finally `union { F, R }`. let layout = layout_header; let (layout, offset_t) = extend(layout, layout_t); let (layout, offset_s) = extend(layout, layout_s); @@ -205,10 +205,10 @@ where let mut state = (*raw.header).state.load(Ordering::Acquire); loop { - // If the task is completed or closed, it can't be woken. + // If the task is completed or closed, it can't be woken up. if state & (COMPLETED | CLOSED) != 0 { // Drop the waker. - Self::decrement(ptr); + Self::drop_waker(ptr); break; } @@ -224,7 +224,7 @@ where ) { Ok(_) => { // Drop the waker. - Self::decrement(ptr); + Self::drop_waker(ptr); break; } Err(s) => state = s, @@ -249,7 +249,7 @@ where (*raw.schedule)(task); } else { // Drop the waker. - Self::decrement(ptr); + Self::drop_waker(ptr); } break; @@ -267,7 +267,7 @@ where let mut state = (*raw.header).state.load(Ordering::Acquire); loop { - // If the task is completed or closed, it can't be woken. + // If the task is completed or closed, it can't be woken up. if state & (COMPLETED | CLOSED) != 0 { break; } @@ -330,7 +330,7 @@ where let raw_waker = &(*raw.header).vtable.raw_waker; // Increment the reference count. With any kind of reference-counted data structure, - // relaxed ordering is fine when the reference is being cloned. + // relaxed ordering is appropriate when incrementing the counter. let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); // If the reference count overflowed, abort. @@ -341,19 +341,48 @@ where RawWaker::new(ptr, raw_waker) } - /// Drops a waker or a task. + /// Drops a waker. + /// + /// This function will decrement the reference count. If it drops down to zero, the associated + /// join handle has been dropped too, and the task has not been completed, then it will get + /// scheduled one more time so that its future gets dropped by the executor. + #[inline] + unsafe fn drop_waker(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // Decrement the reference count. + let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; + + // If this was the last reference to the task and the `JoinHandle` has been dropped too, + // then we need to decide how to destroy the task. + if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 { + if new & (COMPLETED | CLOSED) == 0 { + // If the task was not completed nor closed, close it and schedule one more time so + // that its future gets dropped by the executor. + (*raw.header) + .state + .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); + ((*raw.header).vtable.schedule)(ptr); + } else { + // Otherwise, destroy the task right away. + Self::destroy(ptr); + } + } + } + + /// Drops a task. /// /// This function will decrement the reference count. If it drops down to zero and the /// associated join handle has been dropped too, then the task gets destroyed. #[inline] - unsafe fn decrement(ptr: *const ()) { + unsafe fn drop_task(ptr: *const ()) { let raw = Self::from_ptr(ptr); // Decrement the reference count. let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; - // If this was the last reference to the task and the `JoinHandle` has been dropped as - // well, then destroy the task. + // If this was the last reference to the task and the `JoinHandle` has been dropped too, + // then destroy the task. if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 { Self::destroy(ptr); } @@ -391,8 +420,8 @@ where /// Cleans up task's resources and deallocates it. /// - /// If the task has not been closed, then its future or the output will be dropped. The - /// schedule function and the tag get dropped too. + /// The schedule function and the tag will be dropped, and the task will then get deallocated. + /// The task must be closed before this function is called. #[inline] unsafe fn destroy(ptr: *const ()) { let raw = Self::from_ptr(ptr); @@ -413,8 +442,8 @@ where /// Runs a task. /// - /// If polling its future panics, the task will be closed and the panic propagated into the - /// caller. + /// If polling its future panics, the task will be closed and the panic will be propagated into + /// the caller. unsafe fn run(ptr: *const ()) { let raw = Self::from_ptr(ptr); @@ -429,7 +458,7 @@ where // Update the task's state before polling its future. loop { - // If the task has been closed, drop the task reference and return. + // If the task has already been closed, drop the task reference and return. if state & CLOSED != 0 { // Notify the awaiter that the task has been closed. if state & AWAITER != 0 { @@ -440,7 +469,7 @@ where Self::drop_future(ptr); // Drop the task reference. - Self::decrement(ptr); + Self::drop_task(ptr); return; } @@ -505,7 +534,7 @@ where } // Drop the task reference. - Self::decrement(ptr); + Self::drop_task(ptr); break; } Err(s) => state = s, @@ -519,7 +548,7 @@ where // The task is still not completed. loop { // If the task was closed while running, we'll need to unschedule in case it - // was woken and then clean up its resources. + // was woken up and then destroy it. let new = if state & CLOSED != 0 { state & !RUNNING & !SCHEDULED } else { @@ -535,7 +564,7 @@ where ) { Ok(state) => { // If the task was closed while running, we need to drop its future. - // If the task was woken while running, we need to schedule it. + // If the task was woken up while running, we need to schedule it. // Otherwise, we just drop the task reference. if state & CLOSED != 0 { // The thread that closed the task didn't drop the future because @@ -543,14 +572,14 @@ where Self::drop_future(ptr); // Drop the task reference. - Self::decrement(ptr); + Self::drop_task(ptr); } else if state & SCHEDULED != 0 { - // The thread that has woken the task didn't reschedule it because + // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. Self::schedule(ptr); } else { // Drop the task reference. - Self::decrement(ptr); + Self::drop_task(ptr); } break; } @@ -587,7 +616,7 @@ where // future, and drop the task reference. if state & CLOSED != 0 { // We still need to unschedule the task because it is possible it was - // woken while running. + // woken up while running. (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); // The thread that closed the task didn't drop the future because it @@ -595,7 +624,7 @@ where RawTask::<F, R, S, T>::drop_future(ptr); // Drop the task reference. - RawTask::<F, R, S, T>::decrement(ptr); + RawTask::<F, R, S, T>::drop_task(ptr); break; } @@ -616,7 +645,7 @@ where } // Drop the task reference. - RawTask::<F, R, S, T>::decrement(ptr); + RawTask::<F, R, S, T>::drop_task(ptr); break; } Err(s) => state = s, diff --git a/src/state.rs b/src/state.rs index d6ce34f..c03fea3 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,8 +1,7 @@ /// Set if the task is scheduled for running. /// -/// A task is considered to be scheduled whenever its `Task` reference exists. It is in scheduled -/// state at the moment of creation and when it gets unapused either by its `JoinHandle` or woken -/// by a `Waker`. +/// A task is considered to be scheduled whenever its `Task` reference exists. It therefore also +/// begins in scheduled state at the moment of creation. /// /// This flag can't be set when the task is completed. However, it can be set while the task is /// running, in which case it will be rescheduled as soon as polling finishes. @@ -10,30 +9,30 @@ pub(crate) const SCHEDULED: usize = 1 << 0; /// Set if the task is running. /// -/// A task is running state while its future is being polled. +/// A task is in running state while its future is being polled. /// /// This flag can't be set when the task is completed. However, it can be in scheduled state while -/// it is running, in which case it will be rescheduled when it stops being polled. +/// it is running, in which case it will be rescheduled as soon as polling finishes. pub(crate) const RUNNING: usize = 1 << 1; /// Set if the task has been completed. /// /// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored -/// inside the task until it becomes stopped. In fact, `JoinHandle` picks the output up by marking -/// the task as stopped. +/// inside the task until it becomes closed. In fact, `JoinHandle` picks up the output by marking +/// the task as closed. /// -/// This flag can't be set when the task is scheduled or completed. +/// This flag can't be set when the task is scheduled or running. pub(crate) const COMPLETED: usize = 1 << 2; /// Set if the task is closed. /// -/// If a task is closed, that means its either cancelled or its output has been consumed by the +/// If a task is closed, that means it's either cancelled or its output has been consumed by the /// `JoinHandle`. A task becomes closed when: /// -/// 1. It gets cancelled by `Task::cancel()` or `JoinHandle::cancel()`. -/// 2. Its output is awaited by the `JoinHandle`. +/// 1. It gets cancelled by `Task::cancel()`, `Task::drop()`, or `JoinHandle::cancel()`. +/// 2. Its output gets awaited by the `JoinHandle`. /// 3. It panics while polling the future. -/// 4. It is completed and the `JoinHandle` is dropped. +/// 4. It is completed and the `JoinHandle` gets dropped. pub(crate) const CLOSED: usize = 1 << 3; /// Set if the `JoinHandle` still exists. @@ -51,7 +50,7 @@ pub(crate) const AWAITER: usize = 1 << 5; /// Set if the awaiter is locked. /// -/// This lock is acquired before a new awaiter is registered or the existing one is woken. +/// This lock is acquired before a new awaiter is registered or the existing one is woken up. pub(crate) const LOCKED: usize = 1 << 6; /// A single reference. diff --git a/src/task.rs b/src/task.rs index 9097f44..42a4024 100644 --- a/src/task.rs +++ b/src/task.rs @@ -13,8 +13,8 @@ use crate::JoinHandle; /// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that /// awaits its result. /// -/// When run, the task polls `future`. When woken, it gets scheduled for running by the `schedule` -/// function. Argument `tag` is an arbitrary piece of data stored inside the task. +/// When run, the task polls `future`. When woken up, it gets scheduled for running by the +/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task. /// /// [`Task`]: struct.Task.html /// [`JoinHandle`]: struct.JoinHandle.html @@ -29,7 +29,7 @@ use crate::JoinHandle; /// println!("Hello, world!"); /// }; /// -/// // If the task gets woken, it will be sent into this channel. +/// // If the task gets woken up, it will be sent into this channel. /// let (s, r) = channel::unbounded(); /// let schedule = move |task| s.send(task).unwrap(); /// @@ -57,19 +57,19 @@ where /// A task reference that runs its future. /// -/// The [`Task`] reference "owns" the task itself and is able to run it. Running consumes the -/// [`Task`] reference and polls its internal future. If the future is still pending after getting -/// polled, the [`Task`] reference simply won't exist until a [`Waker`] notifies the task. If the -/// future completes, its result becomes available to the [`JoinHandle`]. +/// At any moment in time, there is at most one [`Task`] reference associated with a particular +/// task. Running consumes the [`Task`] reference and polls its internal future. If the future is +/// still pending after getting polled, the [`Task`] reference simply won't exist until a [`Waker`] +/// notifies the task. If the future completes, its result becomes available to the [`JoinHandle`]. /// -/// When the task is woken, the [`Task`] reference is recreated and passed to the schedule +/// When a task is woken up, its [`Task`] reference is recreated and passed to the schedule /// function. In most executors, scheduling simply pushes the [`Task`] reference into a queue of /// runnable tasks. /// -/// If the [`Task`] reference is dropped without being run, the task is cancelled. When cancelled, -/// the task won't be scheduled again even if a [`Waker`] wakes it. It is possible for the -/// [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt to run -/// the task won't do anything. +/// If the [`Task`] reference is dropped without getting run, the task is automatically cancelled. +/// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. It is possible +/// for the [`JoinHandle`] to cancel while the [`Task`] reference exists, in which case an attempt +/// to run the task won't do anything. /// /// [`run()`]: struct.Task.html#method.run /// [`JoinHandle`]: struct.JoinHandle.html @@ -107,16 +107,14 @@ impl<T> Task<T> { /// /// This method polls the task's future. If the future completes, its result will become /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to - /// be woken in order to be rescheduled and then run again. + /// be woken up in order to be rescheduled and run again. /// /// If the task was cancelled by a [`JoinHandle`] before it gets run, then this method won't do /// anything. /// /// It is possible that polling the future panics, in which case the panic will be propagated /// into the caller. It is advised that invocations of this method are wrapped inside - /// [`catch_unwind`]. - /// - /// If a panic occurs, the task is automatically cancelled. + /// [`catch_unwind`]. If a panic occurs, the task is automatically cancelled. /// /// [`JoinHandle`]: struct.JoinHandle.html /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html @@ -170,7 +168,7 @@ impl<T> Drop for Task<T> { ((*header).vtable.drop_future)(ptr); // Drop the task reference. - ((*header).vtable.decrement)(ptr); + ((*header).vtable.drop_task)(ptr); } } } |