diff options
author | Chris Wailes <chriswailes@google.com> | 2022-12-12 11:43:41 -0800 |
---|---|---|
committer | Jeff Vander Stoep <jeffv@google.com> | 2023-01-18 19:48:52 +0100 |
commit | ff62579fde0625f6c8923b58c9dc848c97c680e6 (patch) | |
tree | c049adb6c0fca041cbb303c8311c1084e4a832cd /src/runtime/task/harness.rs | |
parent | b669ae94fdcda726d88936d028d35187bf41b016 (diff) | |
download | tokio-ff62579fde0625f6c8923b58c9dc848c97c680e6.tar.gz |
Upgrade tokio to 1.23.0
This project was upgraded with external_updater.
Usage: tools/external_updater/updater.sh update rust/crates/tokio
For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
Test: TreeHugger
Change-Id: Id69553d5e858bddcde0de5b9e72d6bb3c08bafb5
Diffstat (limited to 'src/runtime/task/harness.rs')
-rw-r--r-- | src/runtime/task/harness.rs | 279 |
1 files changed, 153 insertions, 126 deletions
diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs index 0996e52..c079297 100644 --- a/src/runtime/task/harness.rs +++ b/src/runtime/task/harness.rs @@ -1,8 +1,8 @@ use crate::future::Future; -use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Trailer}; -use crate::runtime::task::state::Snapshot; +use crate::runtime::task::core::{Cell, Core, Header, Trailer}; +use crate::runtime::task::state::{Snapshot, State}; use crate::runtime::task::waker::waker_ref; -use crate::runtime::task::{JoinError, Notified, Schedule, Task}; +use crate::runtime::task::{JoinError, Notified, RawTask, Schedule, Task}; use std::mem; use std::mem::ManuallyDrop; @@ -26,8 +26,16 @@ where } } + fn header_ptr(&self) -> NonNull<Header> { + self.cell.cast() + } + fn header(&self) -> &Header { - unsafe { &self.cell.as_ref().header } + unsafe { &*self.header_ptr().as_ptr() } + } + + fn state(&self) -> &State { + &self.header().state } fn trailer(&self) -> &Trailer { @@ -39,11 +47,102 @@ where } } +/// Task operations that can be implemented without being generic over the +/// scheduler or task. Only one version of these methods should exist in the +/// final binary. +impl RawTask { + pub(super) fn drop_reference(self) { + if self.state().ref_dec() { + self.dealloc(); + } + } + + /// This call consumes a ref-count and notifies the task. This will create a + /// new Notified and submit it if necessary. + /// + /// The caller does not need to hold a ref-count besides the one that was + /// passed to this call. + pub(super) fn wake_by_val(&self) { + use super::state::TransitionToNotifiedByVal; + + match self.state().transition_to_notified_by_val() { + TransitionToNotifiedByVal::Submit => { + // The caller has given us a ref-count, and the transition has + // created a new ref-count, so we now hold two. We turn the new + // ref-count Notified and pass it to the call to `schedule`. + // + // The old ref-count is retained for now to ensure that the task + // is not dropped during the call to `schedule` if the call + // drops the task it was given. + self.schedule(); + + // Now that we have completed the call to schedule, we can + // release our ref-count. + self.drop_reference(); + } + TransitionToNotifiedByVal::Dealloc => { + self.dealloc(); + } + TransitionToNotifiedByVal::DoNothing => {} + } + } + + /// This call notifies the task. It will not consume any ref-counts, but the + /// caller should hold a ref-count. This will create a new Notified and + /// submit it if necessary. + pub(super) fn wake_by_ref(&self) { + use super::state::TransitionToNotifiedByRef; + + match self.state().transition_to_notified_by_ref() { + TransitionToNotifiedByRef::Submit => { + // The transition above incremented the ref-count for a new task + // and the caller also holds a ref-count. The caller's ref-count + // ensures that the task is not destroyed even if the new task + // is dropped before `schedule` returns. + self.schedule(); + } + TransitionToNotifiedByRef::DoNothing => {} + } + } + + /// Remotely aborts the task. + /// + /// The caller should hold a ref-count, but we do not consume it. + /// + /// This is similar to `shutdown` except that it asks the runtime to perform + /// the shutdown. This is necessary to avoid the shutdown happening in the + /// wrong thread for non-Send tasks. + pub(super) fn remote_abort(&self) { + if self.state().transition_to_notified_and_cancel() { + // The transition has created a new ref-count, which we turn into + // a Notified and pass to the task. + // + // Since the caller holds a ref-count, the task cannot be destroyed + // before the call to `schedule` returns even if the call drops the + // `Notified` internally. + self.schedule(); + } + } + + /// Try to set the waker notified when the task is complete. Returns true if + /// the task has already completed. If this call returns false, then the + /// waker will not be notified. + pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool { + can_read_output(self.header(), self.trailer(), waker) + } +} + impl<T, S> Harness<T, S> where T: Future, S: Schedule, { + pub(super) fn drop_reference(self) { + if self.state().ref_dec() { + self.dealloc(); + } + } + /// Polls the inner future. A ref-count is consumed. /// /// All necessary state checks and transitions are performed. @@ -91,32 +190,32 @@ where fn poll_inner(&self) -> PollFuture { use super::state::{TransitionToIdle, TransitionToRunning}; - match self.header().state.transition_to_running() { + match self.state().transition_to_running() { TransitionToRunning::Success => { - let waker_ref = waker_ref::<T, S>(self.header()); + let header_ptr = self.header_ptr(); + let waker_ref = waker_ref::<T, S>(&header_ptr); let cx = Context::from_waker(&*waker_ref); - let res = poll_future(&self.core().stage, cx); + let res = poll_future(self.core(), cx); if res == Poll::Ready(()) { // The future completed. Move on to complete the task. return PollFuture::Complete; } - match self.header().state.transition_to_idle() { + match self.state().transition_to_idle() { TransitionToIdle::Ok => PollFuture::Done, TransitionToIdle::OkNotified => PollFuture::Notified, TransitionToIdle::OkDealloc => PollFuture::Dealloc, TransitionToIdle::Cancelled => { // The transition to idle failed because the task was // cancelled during the poll. - - cancel_task(&self.core().stage); + cancel_task(self.core()); PollFuture::Complete } } } TransitionToRunning::Cancelled => { - cancel_task(&self.core().stage); + cancel_task(self.core()); PollFuture::Complete } TransitionToRunning::Failed => PollFuture::Done, @@ -131,7 +230,7 @@ where /// there is nothing further to do. When the task completes running, it will /// notice the `CANCELLED` bit and finalize the task. pub(super) fn shutdown(self) { - if !self.header().state.transition_to_shutdown() { + if !self.state().transition_to_shutdown() { // The task is concurrently running. No further work needed. self.drop_reference(); return; @@ -139,7 +238,7 @@ where // By transitioning the lifecycle to `Running`, we have permission to // drop the future. - cancel_task(&self.core().stage); + cancel_task(self.core()); self.complete(); } @@ -150,6 +249,19 @@ where // Check causality self.core().stage.with_mut(drop); + // Safety: The caller of this method just transitioned our ref-count to + // zero, so it is our responsibility to release the allocation. + // + // We don't hold any references into the allocation at this point, but + // it is possible for another thread to still hold a `&State` into the + // allocation if that other thread has decremented its last ref-count, + // but has not yet returned from the relevant method on `State`. + // + // However, the `State` type consists of just an `AtomicUsize`, and an + // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`. + // As explained in the documentation for `UnsafeCell`, such references + // are allowed to be dangling after their last use, even if the + // reference has not yet gone out of scope. unsafe { drop(Box::from_raw(self.cell.as_ptr())); } @@ -160,122 +272,30 @@ where /// Read the task output into `dst`. pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) { if can_read_output(self.header(), self.trailer(), waker) { - *dst = Poll::Ready(self.core().stage.take_output()); + *dst = Poll::Ready(self.core().take_output()); } } pub(super) fn drop_join_handle_slow(self) { - let mut maybe_panic = None; - // Try to unset `JOIN_INTEREST`. This must be done as a first step in // case the task concurrently completed. - if self.header().state.unset_join_interested().is_err() { + if self.state().unset_join_interested().is_err() { // It is our responsibility to drop the output. This is critical as // the task output may not be `Send` and as such must remain with // the scheduler or `JoinHandle`. i.e. if the output remains in the // task structure until the task is deallocated, it may be dropped // by a Waker on any arbitrary thread. - let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { - self.core().stage.drop_future_or_output(); + // + // Panics are delivered to the user via the `JoinHandle`. Given that + // they are dropping the `JoinHandle`, we assume they are not + // interested in the panic and swallow it. + let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + self.core().drop_future_or_output(); })); - - if let Err(panic) = panic { - maybe_panic = Some(panic); - } } // Drop the `JoinHandle` reference, possibly deallocating the task self.drop_reference(); - - if let Some(panic) = maybe_panic { - panic::resume_unwind(panic); - } - } - - /// Remotely aborts the task. - /// - /// The caller should hold a ref-count, but we do not consume it. - /// - /// This is similar to `shutdown` except that it asks the runtime to perform - /// the shutdown. This is necessary to avoid the shutdown happening in the - /// wrong thread for non-Send tasks. - pub(super) fn remote_abort(self) { - if self.header().state.transition_to_notified_and_cancel() { - // The transition has created a new ref-count, which we turn into - // a Notified and pass to the task. - // - // Since the caller holds a ref-count, the task cannot be destroyed - // before the call to `schedule` returns even if the call drops the - // `Notified` internally. - self.core() - .scheduler - .schedule(Notified(self.get_new_task())); - } - } - - // ===== waker behavior ===== - - /// This call consumes a ref-count and notifies the task. This will create a - /// new Notified and submit it if necessary. - /// - /// The caller does not need to hold a ref-count besides the one that was - /// passed to this call. - pub(super) fn wake_by_val(self) { - use super::state::TransitionToNotifiedByVal; - - match self.header().state.transition_to_notified_by_val() { - TransitionToNotifiedByVal::Submit => { - // The caller has given us a ref-count, and the transition has - // created a new ref-count, so we now hold two. We turn the new - // ref-count Notified and pass it to the call to `schedule`. - // - // The old ref-count is retained for now to ensure that the task - // is not dropped during the call to `schedule` if the call - // drops the task it was given. - self.core() - .scheduler - .schedule(Notified(self.get_new_task())); - - // Now that we have completed the call to schedule, we can - // release our ref-count. - self.drop_reference(); - } - TransitionToNotifiedByVal::Dealloc => { - self.dealloc(); - } - TransitionToNotifiedByVal::DoNothing => {} - } - } - - /// This call notifies the task. It will not consume any ref-counts, but the - /// caller should hold a ref-count. This will create a new Notified and - /// submit it if necessary. - pub(super) fn wake_by_ref(&self) { - use super::state::TransitionToNotifiedByRef; - - match self.header().state.transition_to_notified_by_ref() { - TransitionToNotifiedByRef::Submit => { - // The transition above incremented the ref-count for a new task - // and the caller also holds a ref-count. The caller's ref-count - // ensures that the task is not destroyed even if the new task - // is dropped before `schedule` returns. - self.core() - .scheduler - .schedule(Notified(self.get_new_task())); - } - TransitionToNotifiedByRef::DoNothing => {} - } - } - - pub(super) fn drop_reference(self) { - if self.header().state.ref_dec() { - self.dealloc(); - } - } - - #[cfg(all(tokio_unstable, feature = "tracing"))] - pub(super) fn id(&self) -> Option<&tracing::Id> { - self.header().id.as_ref() } // ====== internal ====== @@ -285,7 +305,7 @@ where // The future has completed and its output has been written to the task // stage. We transition from running to complete. - let snapshot = self.header().state.transition_to_complete(); + let snapshot = self.state().transition_to_complete(); // We catch panics here in case dropping the future or waking the // JoinHandle panics. @@ -294,7 +314,7 @@ where // The `JoinHandle` is not interested in the output of // this task. It is our responsibility to drop the // output. - self.core().stage.drop_future_or_output(); + self.core().drop_future_or_output(); } else if snapshot.has_join_waker() { // Notify the join handle. The previous transition obtains the // lock on the waker cell. @@ -305,7 +325,7 @@ where // The task has completed execution and will no longer be scheduled. let num_release = self.release(); - if self.header().state.transition_to_terminal(num_release) { + if self.state().transition_to_terminal(num_release) { self.dealloc(); } } @@ -426,31 +446,31 @@ enum PollFuture { } /// Cancels the task and store the appropriate error in the stage field. -fn cancel_task<T: Future>(stage: &CoreStage<T>) { +fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) { // Drop the future from a panic guard. let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { - stage.drop_future_or_output(); + core.drop_future_or_output(); })); match res { Ok(()) => { - stage.store_output(Err(JoinError::cancelled())); + core.store_output(Err(JoinError::cancelled(core.task_id))); } Err(panic) => { - stage.store_output(Err(JoinError::panic(panic))); + core.store_output(Err(JoinError::panic(core.task_id, panic))); } } } /// Polls the future. If the future completes, the output is written to the /// stage field. -fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> { +fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> { // Poll the future. let output = panic::catch_unwind(panic::AssertUnwindSafe(|| { - struct Guard<'a, T: Future> { - core: &'a CoreStage<T>, + struct Guard<'a, T: Future, S: Schedule> { + core: &'a Core<T, S>, } - impl<'a, T: Future> Drop for Guard<'a, T> { + impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> { fn drop(&mut self) { // If the future panics on poll, we drop it inside the panic // guard. @@ -467,13 +487,20 @@ fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> { let output = match output { Ok(Poll::Pending) => return Poll::Pending, Ok(Poll::Ready(output)) => Ok(output), - Err(panic) => Err(JoinError::panic(panic)), + Err(panic) => { + core.scheduler.unhandled_panic(); + Err(JoinError::panic(core.task_id, panic)) + } }; // Catch and ignore panics if the future panics on drop. - let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { core.store_output(output); })); + if res.is_err() { + core.scheduler.unhandled_panic(); + } + Poll::Ready(()) } |