diff options
author | Stjepan Glavina <stjepang@gmail.com> | 2020-09-18 14:04:58 +0200 |
---|---|---|
committer | Stjepan Glavina <stjepang@gmail.com> | 2020-09-18 14:04:58 +0200 |
commit | c533cc6a78f12099143f2230a851abb77d0e67c2 (patch) | |
tree | bdd94f790d3e96e0c6f03546e6bffb5792ce22d9 /src | |
parent | ceeb5a1742afa244229f731db33bef5a905784c9 (diff) | |
download | async-task-c533cc6a78f12099143f2230a851abb77d0e67c2.tar.gz |
Change JoinHandle output type
Diffstat (limited to 'src')
-rw-r--r-- | src/join_handle.rs | 258 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/task.rs | 8 |
3 files changed, 147 insertions, 121 deletions
diff --git a/src/join_handle.rs b/src/join_handle.rs index 745c786..24fef59 100644 --- a/src/join_handle.rs +++ b/src/join_handle.rs @@ -16,6 +16,7 @@ use crate::state::*; /// /// * `None` indicates the task has panicked or was canceled. /// * `Some(result)` indicates the task has completed with `result` of type `R`. +#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] pub struct JoinHandle<R> { /// A raw task pointer. pub(crate) raw_task: NonNull<()>, @@ -29,156 +30,161 @@ unsafe impl<R> Sync for JoinHandle<R> {} impl<R> Unpin for JoinHandle<R> {} +#[cfg(feature = "std")] +impl<R> std::panic::UnwindSafe for JoinHandle<R> {} +#[cfg(feature = "std")] +impl<R> std::panic::RefUnwindSafe for JoinHandle<R> {} + impl<R> JoinHandle<R> { pub fn detach(self) { - let ptr = self.raw_task.as_ptr(); - mem::forget(self); - unsafe { - Self::drop_raw(ptr); - } + let mut this = self; + let _out = this.set_detached(); + mem::forget(this); } pub async fn cancel(self) -> Option<R> { - unsafe { - Self::cancel_raw(self.raw_task.as_ptr()); + let mut this = self; + this.set_canceled(); + + struct Fut<T>(JoinHandle<T>); + + impl<T> Future for Fut<T> { + type Output = Option<T>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.0.poll_result(cx) + } } - self.await + + Fut(this).await } - unsafe fn cancel_raw(ptr: *const ()) { + fn set_canceled(&mut self) { + let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; - let mut state = (*header).state.load(Ordering::Acquire); + unsafe { + let mut state = (*header).state.load(Ordering::Acquire); - loop { - // If the task has been completed or closed, it can't be canceled. - if state & (COMPLETED | CLOSED) != 0 { - break; - } + loop { + // If the task has been completed or closed, it can't be canceled. + if state & (COMPLETED | CLOSED) != 0 { + break; + } - // If the task is not scheduled nor running, we'll need to schedule it. - let new = if state & (SCHEDULED | RUNNING) == 0 { - (state | SCHEDULED | CLOSED) + REFERENCE - } else { - state | CLOSED - }; - - // Mark the task as closed. - match (*header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If the task is not scheduled nor running, schedule it one more time so - // that its future gets dropped by the executor. - if state & (SCHEDULED | RUNNING) == 0 { - ((*header).vtable.schedule)(ptr); - } + // If the task is not scheduled nor running, we'll need to schedule it. + let new = if state & (SCHEDULED | RUNNING) == 0 { + (state | SCHEDULED | CLOSED) + REFERENCE + } else { + state | CLOSED + }; - // Notify the awaiter that the task has been closed. - if state & AWAITER != 0 { - (*header).notify(None); - } + // Mark the task as closed. + match (*header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not scheduled nor running, schedule it one more time so + // that its future gets dropped by the executor. + if state & (SCHEDULED | RUNNING) == 0 { + ((*header).vtable.schedule)(ptr); + } - break; + // Notify the awaiter that the task has been closed. + if state & AWAITER != 0 { + (*header).notify(None); + } + + break; + } + Err(s) => state = s, } - Err(s) => state = s, } } } - unsafe fn drop_raw(ptr: *const ()) { + fn set_detached(&mut self) -> Option<R> { + let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; - // A place where the output will be stored in case it needs to be dropped. - let mut output = None; - - // Optimistically assume the `JoinHandle` is being detached just after creating the - // task. This is a common case so if the handle is not used, the overhead of it is only - // one compare-exchange operation. - if let Err(mut state) = (*header).state.compare_exchange_weak( - SCHEDULED | HANDLE | REFERENCE, - SCHEDULED | REFERENCE, - Ordering::AcqRel, - Ordering::Acquire, - ) { - loop { - // If the task has been completed but not yet closed, that means its output - // must be dropped. - if state & COMPLETED != 0 && state & CLOSED == 0 { - // Mark the task as closed in order to grab its output. - match (*header).state.compare_exchange_weak( - state, - state | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Read the output. - output = Some((((*header).vtable.get_output)(ptr) as *mut R).read()); - - // Update the state variable because we're continuing the loop. - state |= CLOSED; + unsafe { + // A place where the output will be stored in case it needs to be dropped. + let mut output = None; + + // Optimistically assume the `JoinHandle` is being detached just after creating the + // task. This is a common case so if the handle is not used, the overhead of it is only + // one compare-exchange operation. + if let Err(mut state) = (*header).state.compare_exchange_weak( + SCHEDULED | HANDLE | REFERENCE, + SCHEDULED | REFERENCE, + Ordering::AcqRel, + Ordering::Acquire, + ) { + loop { + // If the task has been completed but not yet closed, that means its output + // must be dropped. + if state & COMPLETED != 0 && state & CLOSED == 0 { + // Mark the task as closed in order to grab its output. + match (*header).state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Read the output. + output = + Some((((*header).vtable.get_output)(ptr) as *mut R).read()); + + // Update the state variable because we're continuing the loop. + state |= CLOSED; + } + Err(s) => state = s, } - Err(s) => state = s, - } - } else { - // If this is the last reference to the task and it's not closed, then - // close it and schedule one more time so that its future gets dropped by - // the executor. - let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { - SCHEDULED | CLOSED | REFERENCE } else { - state & !HANDLE - }; - - // Unset the handle flag. - match (*header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If this is the last reference to the task, we need to either - // schedule dropping its future or destroy it. - if state & !(REFERENCE - 1) == 0 { - if state & CLOSED == 0 { - ((*header).vtable.schedule)(ptr); - } else { - ((*header).vtable.destroy)(ptr); + // If this is the last reference to the task and it's not closed, then + // close it and schedule one more time so that its future gets dropped by + // the executor. + let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { + SCHEDULED | CLOSED | REFERENCE + } else { + state & !HANDLE + }; + + // Unset the handle flag. + match (*header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If this is the last reference to the task, we need to either + // schedule dropping its future or destroy it. + if state & !(REFERENCE - 1) == 0 { + if state & CLOSED == 0 { + ((*header).vtable.schedule)(ptr); + } else { + ((*header).vtable.destroy)(ptr); + } } - } - break; + break; + } + Err(s) => state = s, } - Err(s) => state = s, } } } - } - // Drop the output if it was taken out of the task. - drop(output); - } -} - -impl<R> Drop for JoinHandle<R> { - fn drop(&mut self) { - let ptr = self.raw_task.as_ptr(); - unsafe { - Self::cancel_raw(ptr); - Self::drop_raw(ptr); + output } } -} -impl<R> Future for JoinHandle<R> { - type Output = Option<R>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll<Option<R>> { let ptr = self.raw_task.as_ptr(); let header = ptr as *const Header; @@ -256,6 +262,24 @@ impl<R> Future for JoinHandle<R> { } } +impl<R> Drop for JoinHandle<R> { + fn drop(&mut self) { + self.set_canceled(); + self.set_detached(); + } +} + +impl<R> Future for JoinHandle<R> { + type Output = R; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match self.poll_result(cx) { + Poll::Ready(t) => Poll::Ready(t.expect("task has failed")), + Poll::Pending => Poll::Pending, + } + } +} + impl<R> fmt::Debug for JoinHandle<R> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ptr = self.raw_task.as_ptr(); @@ -97,7 +97,7 @@ //! [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html //! [`block_on`]: https://github.com/async-rs/async-task/blob/master/examples/block.rs -#![no_std] +#![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![doc(test(attr(deny(rust_2018_idioms, warnings))))] #![doc(test(attr(allow(unused_extern_crates, unused_variables))))] diff --git a/src/task.rs b/src/task.rs index 0f7e4ea..7374cf4 100644 --- a/src/task.rs +++ b/src/task.rs @@ -115,13 +115,10 @@ where R: 'static, S: Fn(Task) + Send + Sync + 'static, { - extern crate std; - use std::mem::ManuallyDrop; use std::pin::Pin; use std::task::{Context, Poll}; use std::thread::{self, ThreadId}; - use std::thread_local; #[inline] fn thread_id() -> ThreadId { @@ -211,6 +208,11 @@ pub struct Task { unsafe impl Send for Task {} unsafe impl Sync for Task {} +#[cfg(feature = "std")] +impl std::panic::UnwindSafe for Task {} +#[cfg(feature = "std")] +impl std::panic::RefUnwindSafe for Task {} + impl Task { /// Schedules the task. /// |