diff options
author | Stjepan Glavina <stjepang@gmail.com> | 2020-09-20 02:12:51 +0200 |
---|---|---|
committer | Stjepan Glavina <stjepang@gmail.com> | 2020-09-20 02:12:51 +0200 |
commit | c5604c3eff41477fefcf9393f2e52924475442d9 (patch) | |
tree | 74de9a610ef24081beef6757f795013ab7c8e90a /src | |
parent | 59a5a260c6b5fea0f39d724b23972da49af0f5d3 (diff) | |
download | async-task-c5604c3eff41477fefcf9393f2e52924475442d9.tar.gz |
More docs and small fixes
Diffstat (limited to 'src')
-rw-r--r-- | src/header.rs | 23 | ||||
-rw-r--r-- | src/raw.rs | 61 | ||||
-rw-r--r-- | src/runnable.rs | 131 | ||||
-rw-r--r-- | src/task.rs | 8 |
4 files changed, 160 insertions, 63 deletions
diff --git a/src/header.rs b/src/header.rs index 9c4960d..8a3a0b9 100644 --- a/src/header.rs +++ b/src/header.rs @@ -34,6 +34,16 @@ impl Header { /// If the awaiter is the same as the current waker, it will not be notified. #[inline] pub(crate) fn notify(&self, current: Option<&Waker>) { + if let Some(w) = self.take(current) { + abort_on_panic(|| w.wake()); + } + } + + /// Takes the awaiter blocked on this task. + /// + /// If there is no awaiter or if it is the same as the current waker, returns `None`. + #[inline] + pub(crate) fn take(&self, current: Option<&Waker>) -> Option<Waker> { // Set the bit indicating that the task is notifying its awaiter. let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel); @@ -48,14 +58,15 @@ impl Header { // Finally, notify the waker if it's different from the current waker. if let Some(w) = waker { - // We need a safeguard against panics because waking can panic. - abort_on_panic(|| match current { - None => w.wake(), - Some(c) if !w.will_wake(c) => w.wake(), - Some(_) => {} - }); + match current { + None => return Some(w), + Some(c) if !w.will_wake(c) => return Some(w), + Some(_) => abort_on_panic(|| drop(w)), + } } } + + None } /// Registers a new awaiter blocked on this task. @@ -451,13 +451,19 @@ where // Mark the task as unscheduled. let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); - // Notify the awaiter that the future has been dropped. + // Take the awaiter out. + let mut awaiter = None; if state & AWAITER != 0 { - (*raw.header).notify(None); + awaiter = (*raw.header).take(None); } // Drop the task reference. Self::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } return false; } @@ -489,9 +495,6 @@ where Self::drop_future(ptr); raw.output.write(out); - // A place where the output will be stored in case it needs to be dropped. - let mut output = None; - // The task is now completed. loop { // If the `Task` is dropped, we'll need to close it and drop the output. @@ -512,25 +515,28 @@ where // If the `Task` is dropped or if the task was closed while running, // now it's time to drop the output. if state & TASK == 0 || state & CLOSED != 0 { - // Read the output. - output = Some(raw.output.read()); + // Drop the output. + abort_on_panic(|| raw.output.drop_in_place()); } - // Notify the awaiter that the task has been completed. + // Take the awaiter out. + let mut awaiter = None; if state & AWAITER != 0 { - (*raw.header).notify(None); + awaiter = (*raw.header).take(None); } // Drop the task reference. Self::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } break; } Err(s) => state = s, } } - - // Drop the output if it was taken out of the task. - drop(output); } Poll::Pending => { let mut future_dropped = false; @@ -564,12 +570,19 @@ where // If the task was woken up while running, we need to schedule it. // Otherwise, we just drop the task reference. if state & CLOSED != 0 { - // Notify the awaiter that the future has been dropped. + // Take the awaiter out. + let mut awaiter = None; if state & AWAITER != 0 { - (*raw.header).notify(None); + awaiter = (*raw.header).take(None); } + // Drop the task reference. Self::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } } else if state & SCHEDULED != 0 { // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. @@ -620,13 +633,19 @@ where .state .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); - // Notify the awaiter that the future has been dropped. + // Take the awaiter out. + let mut awaiter = None; if state & AWAITER != 0 { - (*raw.header).notify(None); + awaiter = (*raw.header).take(None); } // Drop the task reference. RawTask::<F, T, S>::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } break; } @@ -641,13 +660,19 @@ where // Drop the future because the task is now closed. RawTask::<F, T, S>::drop_future(ptr); - // Notify the awaiter that the future has been dropped. + // Take the awaiter out. + let mut awaiter = None; if state & AWAITER != 0 { - (*raw.header).notify(None); + awaiter = (*raw.header).take(None); } // Drop the task reference. RawTask::<F, T, S>::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } break; } Err(s) => state = s, diff --git a/src/runnable.rs b/src/runnable.rs index b26e32e..be3e25f 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -16,8 +16,9 @@ use crate::Task; /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its /// output. /// -/// Method [`Runnable::run()`] polls the `future` once. Then, the [`Runnable`] vanishes and -/// only reappears when its [`Waker`] wakes the task, thus scheduling it to be run again. +/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] +/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run +/// again. /// /// When the task is woken, its [`Runnable`] is passed to the `schedule` function. /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it @@ -191,30 +192,44 @@ where (runnable, task) } -/// A task reference that runs its future. +/// A handle to a runnable task. /// -/// At any moment in time, there is at most one [`Runnable`] reference associated with a particular -/// task. Running consumes the [`Runnable`] reference and polls its internal future. If the future -/// is still pending after getting polled, the [`Runnable`] reference simply won't exist until a -/// [`Waker`] notifies the task. If the future completes, its result becomes available to the -/// [`Task`]. +/// Every spawned task has a single [`Runnable`] handle, which only exists when the task is +/// scheduled for running. /// -/// When a task is woken up, its [`Runnable`] reference is recreated and passed to the schedule -/// function. In most executors, scheduling simply pushes the [`Runnable`] reference into a queue -/// of runnable tasks. +/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] +/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run +/// again. /// -/// If the [`Runnable`] reference is dropped without getting run, the task is automatically -/// canceled. When canceled, the task won't be scheduled again even if a [`Waker`] wakes it. It is -/// possible for the [`Task`] to cancel while the [`Runnable`] reference exists, in which -/// case an attempt to run the task won't do anything. +/// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and +/// awaiting the [`Task`] after that will result in a panic. /// -/// ---------------- +/// # Examples +/// +/// ``` +/// use async_task::Runnable; +/// use once_cell::sync::Lazy; +/// use std::{panic, thread}; +/// +/// // A simple executor. +/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| { +/// let (sender, receiver) = flume::unbounded::<Runnable>(); +/// thread::spawn(|| { +/// for runnable in receiver { +/// let _ignore_panic = panic::catch_unwind(|| runnable.run()); +/// } +/// }); +/// sender +/// }); /// -/// A runnable future, ready for execution. +/// // Create a task with a simple future. +/// let schedule = |runnable| QUEUE.send(runnable).unwrap(); +/// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule); /// -/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's -/// woken up, its schedule function is called, which means the `Runnable` gets pushed into a task -/// queue in an executor. +/// // Schedule the task and await its output. +/// runnable.schedule(); +/// assert_eq!(smol::future::block_on(task), 3); +/// ``` pub struct Runnable { /// A pointer to the heap-allocated task. pub(crate) ptr: NonNull<()>, @@ -231,10 +246,23 @@ impl std::panic::RefUnwindSafe for Runnable {} impl Runnable { /// Schedules the task. /// - /// This is a convenience method that simply reschedules the task by passing it to its schedule - /// function. + /// This is a convenience method that passes the [`Runnable`] to the schedule function. + /// + /// # Examples + /// + /// ``` + /// // A function that schedules the task when it gets woken up. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); /// - /// If the task is canceled, this method won't do anything. + /// // Create a task with a simple future and the schedule function. + /// let (runnable, task) = async_task::spawn(async {}, schedule); + /// + /// // Schedule the task. + /// assert_eq!(r.len(), 0); + /// runnable.schedule(); + /// assert_eq!(r.len(), 1); + /// ``` pub fn schedule(self) { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; @@ -245,22 +273,33 @@ impl Runnable { } } - /// Runs the task. + /// Runs the task by polling its future. + /// + /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets + /// rescheduled at the end of this method invocation. /// - /// Returns `true` if the task was woken while running, in which case it gets rescheduled at - /// the end of this method invocation. + /// Otherwise, returns `true` and the [`Runnable`] vanishes until the task is woken. /// - /// This method polls the task's future. If the future completes, its result will become - /// available to the [`Task`]. And if the future is still pending, the task will have to - /// be woken up in order to be rescheduled and run again. + /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then + /// this method simply destroys the task. /// - /// If the task was canceled by a [`Task`] before it gets run, then this method won't do - /// anything. + /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`] + /// after that will also result in a panic. /// - /// It is possible that polling the future panics, in which case the panic will be propagated - /// into the caller. It is advised that invocations of this method are wrapped inside - /// [`catch_unwind`][`std::panic::catch_unwind`]. If a panic occurs, the task is automatically - /// canceled. + /// # Examples + /// + /// ``` + /// // A function that schedules the task when it gets woken up. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with a simple future and the schedule function. + /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule); + /// + /// // Run the task and check its output. + /// runnable.run(); + /// assert_eq!(smol::future::block_on(task), 3); + /// ``` pub fn run(self) -> bool { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; @@ -270,6 +309,28 @@ impl Runnable { } /// Returns a waker associated with this task. + /// + /// # Examples + /// + /// ``` + /// use smol::future; + /// + /// // A function that schedules the task when it gets woken up. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with a simple future and the schedule function. + /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule); + /// + /// // Take a waker and run the task. + /// let waker = runnable.waker(); + /// runnable.run(); + /// + /// // Reschedule the task by waking it. + /// assert_eq!(r.len(), 0); + /// waker.wake(); + /// assert_eq!(r.len(), 1); + /// ``` pub fn waker(&self) -> Waker { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; diff --git a/src/task.rs b/src/task.rs index 0f14e82..bd6bec3 100644 --- a/src/task.rs +++ b/src/task.rs @@ -14,10 +14,10 @@ use crate::state::*; /// /// A [`Task`] can be awaited to retrieve the output of its future. /// -/// Dropping a [`Task`] cancels it, which means its future won't be polled again. -/// To drop the [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. -/// To cancel a task gracefully and wait until it is fully destroyed, use the -/// [`cancel()`][Task::cancel()] method. +/// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the +/// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a +/// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()] +/// method. /// /// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor /// can destroy the task by simply dropping its [`Runnable`][`crate::Runnable`] or by invoking |