aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStjepan Glavina <stjepang@gmail.com>2020-09-18 14:04:58 +0200
committerStjepan Glavina <stjepang@gmail.com>2020-09-18 14:04:58 +0200
commitc533cc6a78f12099143f2230a851abb77d0e67c2 (patch)
treebdd94f790d3e96e0c6f03546e6bffb5792ce22d9 /src
parentceeb5a1742afa244229f731db33bef5a905784c9 (diff)
downloadasync-task-c533cc6a78f12099143f2230a851abb77d0e67c2.tar.gz
Change JoinHandle output type
Diffstat (limited to 'src')
-rw-r--r--src/join_handle.rs258
-rw-r--r--src/lib.rs2
-rw-r--r--src/task.rs8
3 files changed, 147 insertions, 121 deletions
diff --git a/src/join_handle.rs b/src/join_handle.rs
index 745c786..24fef59 100644
--- a/src/join_handle.rs
+++ b/src/join_handle.rs
@@ -16,6 +16,7 @@ use crate::state::*;
///
/// * `None` indicates the task has panicked or was canceled.
/// * `Some(result)` indicates the task has completed with `result` of type `R`.
+#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
pub struct JoinHandle<R> {
/// A raw task pointer.
pub(crate) raw_task: NonNull<()>,
@@ -29,156 +30,161 @@ unsafe impl<R> Sync for JoinHandle<R> {}
impl<R> Unpin for JoinHandle<R> {}
+#[cfg(feature = "std")]
+impl<R> std::panic::UnwindSafe for JoinHandle<R> {}
+#[cfg(feature = "std")]
+impl<R> std::panic::RefUnwindSafe for JoinHandle<R> {}
+
impl<R> JoinHandle<R> {
pub fn detach(self) {
- let ptr = self.raw_task.as_ptr();
- mem::forget(self);
- unsafe {
- Self::drop_raw(ptr);
- }
+ let mut this = self;
+ let _out = this.set_detached();
+ mem::forget(this);
}
pub async fn cancel(self) -> Option<R> {
- unsafe {
- Self::cancel_raw(self.raw_task.as_ptr());
+ let mut this = self;
+ this.set_canceled();
+
+ struct Fut<T>(JoinHandle<T>);
+
+ impl<T> Future for Fut<T> {
+ type Output = Option<T>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.0.poll_result(cx)
+ }
}
- self.await
+
+ Fut(this).await
}
- unsafe fn cancel_raw(ptr: *const ()) {
+ fn set_canceled(&mut self) {
+ let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
- let mut state = (*header).state.load(Ordering::Acquire);
+ unsafe {
+ let mut state = (*header).state.load(Ordering::Acquire);
- loop {
- // If the task has been completed or closed, it can't be canceled.
- if state & (COMPLETED | CLOSED) != 0 {
- break;
- }
+ loop {
+ // If the task has been completed or closed, it can't be canceled.
+ if state & (COMPLETED | CLOSED) != 0 {
+ break;
+ }
- // If the task is not scheduled nor running, we'll need to schedule it.
- let new = if state & (SCHEDULED | RUNNING) == 0 {
- (state | SCHEDULED | CLOSED) + REFERENCE
- } else {
- state | CLOSED
- };
-
- // Mark the task as closed.
- match (*header).state.compare_exchange_weak(
- state,
- new,
- Ordering::AcqRel,
- Ordering::Acquire,
- ) {
- Ok(_) => {
- // If the task is not scheduled nor running, schedule it one more time so
- // that its future gets dropped by the executor.
- if state & (SCHEDULED | RUNNING) == 0 {
- ((*header).vtable.schedule)(ptr);
- }
+ // If the task is not scheduled nor running, we'll need to schedule it.
+ let new = if state & (SCHEDULED | RUNNING) == 0 {
+ (state | SCHEDULED | CLOSED) + REFERENCE
+ } else {
+ state | CLOSED
+ };
- // Notify the awaiter that the task has been closed.
- if state & AWAITER != 0 {
- (*header).notify(None);
- }
+ // Mark the task as closed.
+ match (*header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the task is not scheduled nor running, schedule it one more time so
+ // that its future gets dropped by the executor.
+ if state & (SCHEDULED | RUNNING) == 0 {
+ ((*header).vtable.schedule)(ptr);
+ }
- break;
+ // Notify the awaiter that the task has been closed.
+ if state & AWAITER != 0 {
+ (*header).notify(None);
+ }
+
+ break;
+ }
+ Err(s) => state = s,
}
- Err(s) => state = s,
}
}
}
- unsafe fn drop_raw(ptr: *const ()) {
+ fn set_detached(&mut self) -> Option<R> {
+ let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
- // A place where the output will be stored in case it needs to be dropped.
- let mut output = None;
-
- // Optimistically assume the `JoinHandle` is being detached just after creating the
- // task. This is a common case so if the handle is not used, the overhead of it is only
- // one compare-exchange operation.
- if let Err(mut state) = (*header).state.compare_exchange_weak(
- SCHEDULED | HANDLE | REFERENCE,
- SCHEDULED | REFERENCE,
- Ordering::AcqRel,
- Ordering::Acquire,
- ) {
- loop {
- // If the task has been completed but not yet closed, that means its output
- // must be dropped.
- if state & COMPLETED != 0 && state & CLOSED == 0 {
- // Mark the task as closed in order to grab its output.
- match (*header).state.compare_exchange_weak(
- state,
- state | CLOSED,
- Ordering::AcqRel,
- Ordering::Acquire,
- ) {
- Ok(_) => {
- // Read the output.
- output = Some((((*header).vtable.get_output)(ptr) as *mut R).read());
-
- // Update the state variable because we're continuing the loop.
- state |= CLOSED;
+ unsafe {
+ // A place where the output will be stored in case it needs to be dropped.
+ let mut output = None;
+
+ // Optimistically assume the `JoinHandle` is being detached just after creating the
+ // task. This is a common case so if the handle is not used, the overhead of it is only
+ // one compare-exchange operation.
+ if let Err(mut state) = (*header).state.compare_exchange_weak(
+ SCHEDULED | HANDLE | REFERENCE,
+ SCHEDULED | REFERENCE,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ loop {
+ // If the task has been completed but not yet closed, that means its output
+ // must be dropped.
+ if state & COMPLETED != 0 && state & CLOSED == 0 {
+ // Mark the task as closed in order to grab its output.
+ match (*header).state.compare_exchange_weak(
+ state,
+ state | CLOSED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Read the output.
+ output =
+ Some((((*header).vtable.get_output)(ptr) as *mut R).read());
+
+ // Update the state variable because we're continuing the loop.
+ state |= CLOSED;
+ }
+ Err(s) => state = s,
}
- Err(s) => state = s,
- }
- } else {
- // If this is the last reference to the task and it's not closed, then
- // close it and schedule one more time so that its future gets dropped by
- // the executor.
- let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
- SCHEDULED | CLOSED | REFERENCE
} else {
- state & !HANDLE
- };
-
- // Unset the handle flag.
- match (*header).state.compare_exchange_weak(
- state,
- new,
- Ordering::AcqRel,
- Ordering::Acquire,
- ) {
- Ok(_) => {
- // If this is the last reference to the task, we need to either
- // schedule dropping its future or destroy it.
- if state & !(REFERENCE - 1) == 0 {
- if state & CLOSED == 0 {
- ((*header).vtable.schedule)(ptr);
- } else {
- ((*header).vtable.destroy)(ptr);
+ // If this is the last reference to the task and it's not closed, then
+ // close it and schedule one more time so that its future gets dropped by
+ // the executor.
+ let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
+ SCHEDULED | CLOSED | REFERENCE
+ } else {
+ state & !HANDLE
+ };
+
+ // Unset the handle flag.
+ match (*header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If this is the last reference to the task, we need to either
+ // schedule dropping its future or destroy it.
+ if state & !(REFERENCE - 1) == 0 {
+ if state & CLOSED == 0 {
+ ((*header).vtable.schedule)(ptr);
+ } else {
+ ((*header).vtable.destroy)(ptr);
+ }
}
- }
- break;
+ break;
+ }
+ Err(s) => state = s,
}
- Err(s) => state = s,
}
}
}
- }
- // Drop the output if it was taken out of the task.
- drop(output);
- }
-}
-
-impl<R> Drop for JoinHandle<R> {
- fn drop(&mut self) {
- let ptr = self.raw_task.as_ptr();
- unsafe {
- Self::cancel_raw(ptr);
- Self::drop_raw(ptr);
+ output
}
}
-}
-impl<R> Future for JoinHandle<R> {
- type Output = Option<R>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll<Option<R>> {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
@@ -256,6 +262,24 @@ impl<R> Future for JoinHandle<R> {
}
}
+impl<R> Drop for JoinHandle<R> {
+ fn drop(&mut self) {
+ self.set_canceled();
+ self.set_detached();
+ }
+}
+
+impl<R> Future for JoinHandle<R> {
+ type Output = R;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match self.poll_result(cx) {
+ Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
impl<R> fmt::Debug for JoinHandle<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = self.raw_task.as_ptr();
diff --git a/src/lib.rs b/src/lib.rs
index e3a979a..9b2e174 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -97,7 +97,7 @@
//! [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
//! [`block_on`]: https://github.com/async-rs/async-task/blob/master/examples/block.rs
-#![no_std]
+#![cfg_attr(not(feature = "std"), no_std)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![doc(test(attr(deny(rust_2018_idioms, warnings))))]
#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
diff --git a/src/task.rs b/src/task.rs
index 0f7e4ea..7374cf4 100644
--- a/src/task.rs
+++ b/src/task.rs
@@ -115,13 +115,10 @@ where
R: 'static,
S: Fn(Task) + Send + Sync + 'static,
{
- extern crate std;
-
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::{self, ThreadId};
- use std::thread_local;
#[inline]
fn thread_id() -> ThreadId {
@@ -211,6 +208,11 @@ pub struct Task {
unsafe impl Send for Task {}
unsafe impl Sync for Task {}
+#[cfg(feature = "std")]
+impl std::panic::UnwindSafe for Task {}
+#[cfg(feature = "std")]
+impl std::panic::RefUnwindSafe for Task {}
+
impl Task {
/// Schedules the task.
///