aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStjepan Glavina <stjepang@gmail.com>2020-09-20 02:12:51 +0200
committerStjepan Glavina <stjepang@gmail.com>2020-09-20 02:12:51 +0200
commitc5604c3eff41477fefcf9393f2e52924475442d9 (patch)
tree74de9a610ef24081beef6757f795013ab7c8e90a /src
parent59a5a260c6b5fea0f39d724b23972da49af0f5d3 (diff)
downloadasync-task-c5604c3eff41477fefcf9393f2e52924475442d9.tar.gz
More docs and small fixes
Diffstat (limited to 'src')
-rw-r--r--src/header.rs23
-rw-r--r--src/raw.rs61
-rw-r--r--src/runnable.rs131
-rw-r--r--src/task.rs8
4 files changed, 160 insertions, 63 deletions
diff --git a/src/header.rs b/src/header.rs
index 9c4960d..8a3a0b9 100644
--- a/src/header.rs
+++ b/src/header.rs
@@ -34,6 +34,16 @@ impl Header {
/// If the awaiter is the same as the current waker, it will not be notified.
#[inline]
pub(crate) fn notify(&self, current: Option<&Waker>) {
+ if let Some(w) = self.take(current) {
+ abort_on_panic(|| w.wake());
+ }
+ }
+
+ /// Takes the awaiter blocked on this task.
+ ///
+ /// If there is no awaiter or if it is the same as the current waker, returns `None`.
+ #[inline]
+ pub(crate) fn take(&self, current: Option<&Waker>) -> Option<Waker> {
// Set the bit indicating that the task is notifying its awaiter.
let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel);
@@ -48,14 +58,15 @@ impl Header {
// Finally, notify the waker if it's different from the current waker.
if let Some(w) = waker {
- // We need a safeguard against panics because waking can panic.
- abort_on_panic(|| match current {
- None => w.wake(),
- Some(c) if !w.will_wake(c) => w.wake(),
- Some(_) => {}
- });
+ match current {
+ None => return Some(w),
+ Some(c) if !w.will_wake(c) => return Some(w),
+ Some(_) => abort_on_panic(|| drop(w)),
+ }
}
}
+
+ None
}
/// Registers a new awaiter blocked on this task.
diff --git a/src/raw.rs b/src/raw.rs
index c397bc1..9b5854c 100644
--- a/src/raw.rs
+++ b/src/raw.rs
@@ -451,13 +451,19 @@ where
// Mark the task as unscheduled.
let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
- // Notify the awaiter that the future has been dropped.
+ // Take the awaiter out.
+ let mut awaiter = None;
if state & AWAITER != 0 {
- (*raw.header).notify(None);
+ awaiter = (*raw.header).take(None);
}
// Drop the task reference.
Self::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
return false;
}
@@ -489,9 +495,6 @@ where
Self::drop_future(ptr);
raw.output.write(out);
- // A place where the output will be stored in case it needs to be dropped.
- let mut output = None;
-
// The task is now completed.
loop {
// If the `Task` is dropped, we'll need to close it and drop the output.
@@ -512,25 +515,28 @@ where
// If the `Task` is dropped or if the task was closed while running,
// now it's time to drop the output.
if state & TASK == 0 || state & CLOSED != 0 {
- // Read the output.
- output = Some(raw.output.read());
+ // Drop the output.
+ abort_on_panic(|| raw.output.drop_in_place());
}
- // Notify the awaiter that the task has been completed.
+ // Take the awaiter out.
+ let mut awaiter = None;
if state & AWAITER != 0 {
- (*raw.header).notify(None);
+ awaiter = (*raw.header).take(None);
}
// Drop the task reference.
Self::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
break;
}
Err(s) => state = s,
}
}
-
- // Drop the output if it was taken out of the task.
- drop(output);
}
Poll::Pending => {
let mut future_dropped = false;
@@ -564,12 +570,19 @@ where
// If the task was woken up while running, we need to schedule it.
// Otherwise, we just drop the task reference.
if state & CLOSED != 0 {
- // Notify the awaiter that the future has been dropped.
+ // Take the awaiter out.
+ let mut awaiter = None;
if state & AWAITER != 0 {
- (*raw.header).notify(None);
+ awaiter = (*raw.header).take(None);
}
+
// Drop the task reference.
Self::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
} else if state & SCHEDULED != 0 {
// The thread that woke the task up didn't reschedule it because
// it was running so now it's our responsibility to do so.
@@ -620,13 +633,19 @@ where
.state
.fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
- // Notify the awaiter that the future has been dropped.
+ // Take the awaiter out.
+ let mut awaiter = None;
if state & AWAITER != 0 {
- (*raw.header).notify(None);
+ awaiter = (*raw.header).take(None);
}
// Drop the task reference.
RawTask::<F, T, S>::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
break;
}
@@ -641,13 +660,19 @@ where
// Drop the future because the task is now closed.
RawTask::<F, T, S>::drop_future(ptr);
- // Notify the awaiter that the future has been dropped.
+ // Take the awaiter out.
+ let mut awaiter = None;
if state & AWAITER != 0 {
- (*raw.header).notify(None);
+ awaiter = (*raw.header).take(None);
}
// Drop the task reference.
RawTask::<F, T, S>::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
break;
}
Err(s) => state = s,
diff --git a/src/runnable.rs b/src/runnable.rs
index b26e32e..be3e25f 100644
--- a/src/runnable.rs
+++ b/src/runnable.rs
@@ -16,8 +16,9 @@ use crate::Task;
/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
/// output.
///
-/// Method [`Runnable::run()`] polls the `future` once. Then, the [`Runnable`] vanishes and
-/// only reappears when its [`Waker`] wakes the task, thus scheduling it to be run again.
+/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
+/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
+/// again.
///
/// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
@@ -191,30 +192,44 @@ where
(runnable, task)
}
-/// A task reference that runs its future.
+/// A handle to a runnable task.
///
-/// At any moment in time, there is at most one [`Runnable`] reference associated with a particular
-/// task. Running consumes the [`Runnable`] reference and polls its internal future. If the future
-/// is still pending after getting polled, the [`Runnable`] reference simply won't exist until a
-/// [`Waker`] notifies the task. If the future completes, its result becomes available to the
-/// [`Task`].
+/// Every spawned task has a single [`Runnable`] handle, which only exists when the task is
+/// scheduled for running.
///
-/// When a task is woken up, its [`Runnable`] reference is recreated and passed to the schedule
-/// function. In most executors, scheduling simply pushes the [`Runnable`] reference into a queue
-/// of runnable tasks.
+/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
+/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
+/// again.
///
-/// If the [`Runnable`] reference is dropped without getting run, the task is automatically
-/// canceled. When canceled, the task won't be scheduled again even if a [`Waker`] wakes it. It is
-/// possible for the [`Task`] to cancel while the [`Runnable`] reference exists, in which
-/// case an attempt to run the task won't do anything.
+/// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and
+/// awaiting the [`Task`] after that will result in a panic.
///
-/// ----------------
+/// # Examples
+///
+/// ```
+/// use async_task::Runnable;
+/// use once_cell::sync::Lazy;
+/// use std::{panic, thread};
+///
+/// // A simple executor.
+/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
+/// let (sender, receiver) = flume::unbounded::<Runnable>();
+/// thread::spawn(|| {
+/// for runnable in receiver {
+/// let _ignore_panic = panic::catch_unwind(|| runnable.run());
+/// }
+/// });
+/// sender
+/// });
///
-/// A runnable future, ready for execution.
+/// // Create a task with a simple future.
+/// let schedule = |runnable| QUEUE.send(runnable).unwrap();
+/// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
///
-/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's
-/// woken up, its schedule function is called, which means the `Runnable` gets pushed into a task
-/// queue in an executor.
+/// // Schedule the task and await its output.
+/// runnable.schedule();
+/// assert_eq!(smol::future::block_on(task), 3);
+/// ```
pub struct Runnable {
/// A pointer to the heap-allocated task.
pub(crate) ptr: NonNull<()>,
@@ -231,10 +246,23 @@ impl std::panic::RefUnwindSafe for Runnable {}
impl Runnable {
/// Schedules the task.
///
- /// This is a convenience method that simply reschedules the task by passing it to its schedule
- /// function.
+ /// This is a convenience method that passes the [`Runnable`] to the schedule function.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// // A function that schedules the task when it gets woken up.
+ /// let (s, r) = flume::unbounded();
+ /// let schedule = move |runnable| s.send(runnable).unwrap();
///
- /// If the task is canceled, this method won't do anything.
+ /// // Create a task with a simple future and the schedule function.
+ /// let (runnable, task) = async_task::spawn(async {}, schedule);
+ ///
+ /// // Schedule the task.
+ /// assert_eq!(r.len(), 0);
+ /// runnable.schedule();
+ /// assert_eq!(r.len(), 1);
+ /// ```
pub fn schedule(self) {
let ptr = self.ptr.as_ptr();
let header = ptr as *const Header;
@@ -245,22 +273,33 @@ impl Runnable {
}
}
- /// Runs the task.
+ /// Runs the task by polling its future.
+ ///
+ /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets
+ /// rescheduled at the end of this method invocation.
///
- /// Returns `true` if the task was woken while running, in which case it gets rescheduled at
- /// the end of this method invocation.
+ /// Otherwise, returns `true` and the [`Runnable`] vanishes until the task is woken.
///
- /// This method polls the task's future. If the future completes, its result will become
- /// available to the [`Task`]. And if the future is still pending, the task will have to
- /// be woken up in order to be rescheduled and run again.
+ /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then
+ /// this method simply destroys the task.
///
- /// If the task was canceled by a [`Task`] before it gets run, then this method won't do
- /// anything.
+ /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`]
+ /// after that will also result in a panic.
///
- /// It is possible that polling the future panics, in which case the panic will be propagated
- /// into the caller. It is advised that invocations of this method are wrapped inside
- /// [`catch_unwind`][`std::panic::catch_unwind`]. If a panic occurs, the task is automatically
- /// canceled.
+ /// # Examples
+ ///
+ /// ```
+ /// // A function that schedules the task when it gets woken up.
+ /// let (s, r) = flume::unbounded();
+ /// let schedule = move |runnable| s.send(runnable).unwrap();
+ ///
+ /// // Create a task with a simple future and the schedule function.
+ /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
+ ///
+ /// // Run the task and check its output.
+ /// runnable.run();
+ /// assert_eq!(smol::future::block_on(task), 3);
+ /// ```
pub fn run(self) -> bool {
let ptr = self.ptr.as_ptr();
let header = ptr as *const Header;
@@ -270,6 +309,28 @@ impl Runnable {
}
/// Returns a waker associated with this task.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use smol::future;
+ ///
+ /// // A function that schedules the task when it gets woken up.
+ /// let (s, r) = flume::unbounded();
+ /// let schedule = move |runnable| s.send(runnable).unwrap();
+ ///
+ /// // Create a task with a simple future and the schedule function.
+ /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule);
+ ///
+ /// // Take a waker and run the task.
+ /// let waker = runnable.waker();
+ /// runnable.run();
+ ///
+ /// // Reschedule the task by waking it.
+ /// assert_eq!(r.len(), 0);
+ /// waker.wake();
+ /// assert_eq!(r.len(), 1);
+ /// ```
pub fn waker(&self) -> Waker {
let ptr = self.ptr.as_ptr();
let header = ptr as *const Header;
diff --git a/src/task.rs b/src/task.rs
index 0f14e82..bd6bec3 100644
--- a/src/task.rs
+++ b/src/task.rs
@@ -14,10 +14,10 @@ use crate::state::*;
///
/// A [`Task`] can be awaited to retrieve the output of its future.
///
-/// Dropping a [`Task`] cancels it, which means its future won't be polled again.
-/// To drop the [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead.
-/// To cancel a task gracefully and wait until it is fully destroyed, use the
-/// [`cancel()`][Task::cancel()] method.
+/// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
+/// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
+/// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
+/// method.
///
/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
/// can destroy the task by simply dropping its [`Runnable`][`crate::Runnable`] or by invoking