aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/task/harness.rs
diff options
context:
space:
mode:
authorChris Wailes <chriswailes@google.com>2022-12-12 11:43:41 -0800
committerJeff Vander Stoep <jeffv@google.com>2023-01-18 19:48:52 +0100
commitff62579fde0625f6c8923b58c9dc848c97c680e6 (patch)
treec049adb6c0fca041cbb303c8311c1084e4a832cd /src/runtime/task/harness.rs
parentb669ae94fdcda726d88936d028d35187bf41b016 (diff)
downloadtokio-ff62579fde0625f6c8923b58c9dc848c97c680e6.tar.gz
Upgrade tokio to 1.23.0
This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update rust/crates/tokio For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md Test: TreeHugger Change-Id: Id69553d5e858bddcde0de5b9e72d6bb3c08bafb5
Diffstat (limited to 'src/runtime/task/harness.rs')
-rw-r--r--src/runtime/task/harness.rs279
1 files changed, 153 insertions, 126 deletions
diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs
index 0996e52..c079297 100644
--- a/src/runtime/task/harness.rs
+++ b/src/runtime/task/harness.rs
@@ -1,8 +1,8 @@
use crate::future::Future;
-use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Trailer};
-use crate::runtime::task::state::Snapshot;
+use crate::runtime::task::core::{Cell, Core, Header, Trailer};
+use crate::runtime::task::state::{Snapshot, State};
use crate::runtime::task::waker::waker_ref;
-use crate::runtime::task::{JoinError, Notified, Schedule, Task};
+use crate::runtime::task::{JoinError, Notified, RawTask, Schedule, Task};
use std::mem;
use std::mem::ManuallyDrop;
@@ -26,8 +26,16 @@ where
}
}
+ fn header_ptr(&self) -> NonNull<Header> {
+ self.cell.cast()
+ }
+
fn header(&self) -> &Header {
- unsafe { &self.cell.as_ref().header }
+ unsafe { &*self.header_ptr().as_ptr() }
+ }
+
+ fn state(&self) -> &State {
+ &self.header().state
}
fn trailer(&self) -> &Trailer {
@@ -39,11 +47,102 @@ where
}
}
+/// Task operations that can be implemented without being generic over the
+/// scheduler or task. Only one version of these methods should exist in the
+/// final binary.
+impl RawTask {
+ pub(super) fn drop_reference(self) {
+ if self.state().ref_dec() {
+ self.dealloc();
+ }
+ }
+
+ /// This call consumes a ref-count and notifies the task. This will create a
+ /// new Notified and submit it if necessary.
+ ///
+ /// The caller does not need to hold a ref-count besides the one that was
+ /// passed to this call.
+ pub(super) fn wake_by_val(&self) {
+ use super::state::TransitionToNotifiedByVal;
+
+ match self.state().transition_to_notified_by_val() {
+ TransitionToNotifiedByVal::Submit => {
+ // The caller has given us a ref-count, and the transition has
+ // created a new ref-count, so we now hold two. We turn the new
+ // ref-count Notified and pass it to the call to `schedule`.
+ //
+ // The old ref-count is retained for now to ensure that the task
+ // is not dropped during the call to `schedule` if the call
+ // drops the task it was given.
+ self.schedule();
+
+ // Now that we have completed the call to schedule, we can
+ // release our ref-count.
+ self.drop_reference();
+ }
+ TransitionToNotifiedByVal::Dealloc => {
+ self.dealloc();
+ }
+ TransitionToNotifiedByVal::DoNothing => {}
+ }
+ }
+
+ /// This call notifies the task. It will not consume any ref-counts, but the
+ /// caller should hold a ref-count. This will create a new Notified and
+ /// submit it if necessary.
+ pub(super) fn wake_by_ref(&self) {
+ use super::state::TransitionToNotifiedByRef;
+
+ match self.state().transition_to_notified_by_ref() {
+ TransitionToNotifiedByRef::Submit => {
+ // The transition above incremented the ref-count for a new task
+ // and the caller also holds a ref-count. The caller's ref-count
+ // ensures that the task is not destroyed even if the new task
+ // is dropped before `schedule` returns.
+ self.schedule();
+ }
+ TransitionToNotifiedByRef::DoNothing => {}
+ }
+ }
+
+ /// Remotely aborts the task.
+ ///
+ /// The caller should hold a ref-count, but we do not consume it.
+ ///
+ /// This is similar to `shutdown` except that it asks the runtime to perform
+ /// the shutdown. This is necessary to avoid the shutdown happening in the
+ /// wrong thread for non-Send tasks.
+ pub(super) fn remote_abort(&self) {
+ if self.state().transition_to_notified_and_cancel() {
+ // The transition has created a new ref-count, which we turn into
+ // a Notified and pass to the task.
+ //
+ // Since the caller holds a ref-count, the task cannot be destroyed
+ // before the call to `schedule` returns even if the call drops the
+ // `Notified` internally.
+ self.schedule();
+ }
+ }
+
+ /// Try to set the waker notified when the task is complete. Returns true if
+ /// the task has already completed. If this call returns false, then the
+ /// waker will not be notified.
+ pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool {
+ can_read_output(self.header(), self.trailer(), waker)
+ }
+}
+
impl<T, S> Harness<T, S>
where
T: Future,
S: Schedule,
{
+ pub(super) fn drop_reference(self) {
+ if self.state().ref_dec() {
+ self.dealloc();
+ }
+ }
+
/// Polls the inner future. A ref-count is consumed.
///
/// All necessary state checks and transitions are performed.
@@ -91,32 +190,32 @@ where
fn poll_inner(&self) -> PollFuture {
use super::state::{TransitionToIdle, TransitionToRunning};
- match self.header().state.transition_to_running() {
+ match self.state().transition_to_running() {
TransitionToRunning::Success => {
- let waker_ref = waker_ref::<T, S>(self.header());
+ let header_ptr = self.header_ptr();
+ let waker_ref = waker_ref::<T, S>(&header_ptr);
let cx = Context::from_waker(&*waker_ref);
- let res = poll_future(&self.core().stage, cx);
+ let res = poll_future(self.core(), cx);
if res == Poll::Ready(()) {
// The future completed. Move on to complete the task.
return PollFuture::Complete;
}
- match self.header().state.transition_to_idle() {
+ match self.state().transition_to_idle() {
TransitionToIdle::Ok => PollFuture::Done,
TransitionToIdle::OkNotified => PollFuture::Notified,
TransitionToIdle::OkDealloc => PollFuture::Dealloc,
TransitionToIdle::Cancelled => {
// The transition to idle failed because the task was
// cancelled during the poll.
-
- cancel_task(&self.core().stage);
+ cancel_task(self.core());
PollFuture::Complete
}
}
}
TransitionToRunning::Cancelled => {
- cancel_task(&self.core().stage);
+ cancel_task(self.core());
PollFuture::Complete
}
TransitionToRunning::Failed => PollFuture::Done,
@@ -131,7 +230,7 @@ where
/// there is nothing further to do. When the task completes running, it will
/// notice the `CANCELLED` bit and finalize the task.
pub(super) fn shutdown(self) {
- if !self.header().state.transition_to_shutdown() {
+ if !self.state().transition_to_shutdown() {
// The task is concurrently running. No further work needed.
self.drop_reference();
return;
@@ -139,7 +238,7 @@ where
// By transitioning the lifecycle to `Running`, we have permission to
// drop the future.
- cancel_task(&self.core().stage);
+ cancel_task(self.core());
self.complete();
}
@@ -150,6 +249,19 @@ where
// Check causality
self.core().stage.with_mut(drop);
+ // Safety: The caller of this method just transitioned our ref-count to
+ // zero, so it is our responsibility to release the allocation.
+ //
+ // We don't hold any references into the allocation at this point, but
+ // it is possible for another thread to still hold a `&State` into the
+ // allocation if that other thread has decremented its last ref-count,
+ // but has not yet returned from the relevant method on `State`.
+ //
+ // However, the `State` type consists of just an `AtomicUsize`, and an
+ // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`.
+ // As explained in the documentation for `UnsafeCell`, such references
+ // are allowed to be dangling after their last use, even if the
+ // reference has not yet gone out of scope.
unsafe {
drop(Box::from_raw(self.cell.as_ptr()));
}
@@ -160,122 +272,30 @@ where
/// Read the task output into `dst`.
pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
if can_read_output(self.header(), self.trailer(), waker) {
- *dst = Poll::Ready(self.core().stage.take_output());
+ *dst = Poll::Ready(self.core().take_output());
}
}
pub(super) fn drop_join_handle_slow(self) {
- let mut maybe_panic = None;
-
// Try to unset `JOIN_INTEREST`. This must be done as a first step in
// case the task concurrently completed.
- if self.header().state.unset_join_interested().is_err() {
+ if self.state().unset_join_interested().is_err() {
// It is our responsibility to drop the output. This is critical as
// the task output may not be `Send` and as such must remain with
// the scheduler or `JoinHandle`. i.e. if the output remains in the
// task structure until the task is deallocated, it may be dropped
// by a Waker on any arbitrary thread.
- let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- self.core().stage.drop_future_or_output();
+ //
+ // Panics are delivered to the user via the `JoinHandle`. Given that
+ // they are dropping the `JoinHandle`, we assume they are not
+ // interested in the panic and swallow it.
+ let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ self.core().drop_future_or_output();
}));
-
- if let Err(panic) = panic {
- maybe_panic = Some(panic);
- }
}
// Drop the `JoinHandle` reference, possibly deallocating the task
self.drop_reference();
-
- if let Some(panic) = maybe_panic {
- panic::resume_unwind(panic);
- }
- }
-
- /// Remotely aborts the task.
- ///
- /// The caller should hold a ref-count, but we do not consume it.
- ///
- /// This is similar to `shutdown` except that it asks the runtime to perform
- /// the shutdown. This is necessary to avoid the shutdown happening in the
- /// wrong thread for non-Send tasks.
- pub(super) fn remote_abort(self) {
- if self.header().state.transition_to_notified_and_cancel() {
- // The transition has created a new ref-count, which we turn into
- // a Notified and pass to the task.
- //
- // Since the caller holds a ref-count, the task cannot be destroyed
- // before the call to `schedule` returns even if the call drops the
- // `Notified` internally.
- self.core()
- .scheduler
- .schedule(Notified(self.get_new_task()));
- }
- }
-
- // ===== waker behavior =====
-
- /// This call consumes a ref-count and notifies the task. This will create a
- /// new Notified and submit it if necessary.
- ///
- /// The caller does not need to hold a ref-count besides the one that was
- /// passed to this call.
- pub(super) fn wake_by_val(self) {
- use super::state::TransitionToNotifiedByVal;
-
- match self.header().state.transition_to_notified_by_val() {
- TransitionToNotifiedByVal::Submit => {
- // The caller has given us a ref-count, and the transition has
- // created a new ref-count, so we now hold two. We turn the new
- // ref-count Notified and pass it to the call to `schedule`.
- //
- // The old ref-count is retained for now to ensure that the task
- // is not dropped during the call to `schedule` if the call
- // drops the task it was given.
- self.core()
- .scheduler
- .schedule(Notified(self.get_new_task()));
-
- // Now that we have completed the call to schedule, we can
- // release our ref-count.
- self.drop_reference();
- }
- TransitionToNotifiedByVal::Dealloc => {
- self.dealloc();
- }
- TransitionToNotifiedByVal::DoNothing => {}
- }
- }
-
- /// This call notifies the task. It will not consume any ref-counts, but the
- /// caller should hold a ref-count. This will create a new Notified and
- /// submit it if necessary.
- pub(super) fn wake_by_ref(&self) {
- use super::state::TransitionToNotifiedByRef;
-
- match self.header().state.transition_to_notified_by_ref() {
- TransitionToNotifiedByRef::Submit => {
- // The transition above incremented the ref-count for a new task
- // and the caller also holds a ref-count. The caller's ref-count
- // ensures that the task is not destroyed even if the new task
- // is dropped before `schedule` returns.
- self.core()
- .scheduler
- .schedule(Notified(self.get_new_task()));
- }
- TransitionToNotifiedByRef::DoNothing => {}
- }
- }
-
- pub(super) fn drop_reference(self) {
- if self.header().state.ref_dec() {
- self.dealloc();
- }
- }
-
- #[cfg(all(tokio_unstable, feature = "tracing"))]
- pub(super) fn id(&self) -> Option<&tracing::Id> {
- self.header().id.as_ref()
}
// ====== internal ======
@@ -285,7 +305,7 @@ where
// The future has completed and its output has been written to the task
// stage. We transition from running to complete.
- let snapshot = self.header().state.transition_to_complete();
+ let snapshot = self.state().transition_to_complete();
// We catch panics here in case dropping the future or waking the
// JoinHandle panics.
@@ -294,7 +314,7 @@ where
// The `JoinHandle` is not interested in the output of
// this task. It is our responsibility to drop the
// output.
- self.core().stage.drop_future_or_output();
+ self.core().drop_future_or_output();
} else if snapshot.has_join_waker() {
// Notify the join handle. The previous transition obtains the
// lock on the waker cell.
@@ -305,7 +325,7 @@ where
// The task has completed execution and will no longer be scheduled.
let num_release = self.release();
- if self.header().state.transition_to_terminal(num_release) {
+ if self.state().transition_to_terminal(num_release) {
self.dealloc();
}
}
@@ -426,31 +446,31 @@ enum PollFuture {
}
/// Cancels the task and store the appropriate error in the stage field.
-fn cancel_task<T: Future>(stage: &CoreStage<T>) {
+fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
// Drop the future from a panic guard.
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- stage.drop_future_or_output();
+ core.drop_future_or_output();
}));
match res {
Ok(()) => {
- stage.store_output(Err(JoinError::cancelled()));
+ core.store_output(Err(JoinError::cancelled(core.task_id)));
}
Err(panic) => {
- stage.store_output(Err(JoinError::panic(panic)));
+ core.store_output(Err(JoinError::panic(core.task_id, panic)));
}
}
}
/// Polls the future. If the future completes, the output is written to the
/// stage field.
-fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
+fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
// Poll the future.
let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- struct Guard<'a, T: Future> {
- core: &'a CoreStage<T>,
+ struct Guard<'a, T: Future, S: Schedule> {
+ core: &'a Core<T, S>,
}
- impl<'a, T: Future> Drop for Guard<'a, T> {
+ impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
fn drop(&mut self) {
// If the future panics on poll, we drop it inside the panic
// guard.
@@ -467,13 +487,20 @@ fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> {
let output = match output {
Ok(Poll::Pending) => return Poll::Pending,
Ok(Poll::Ready(output)) => Ok(output),
- Err(panic) => Err(JoinError::panic(panic)),
+ Err(panic) => {
+ core.scheduler.unhandled_panic();
+ Err(JoinError::panic(core.task_id, panic))
+ }
};
// Catch and ignore panics if the future panics on drop.
- let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
core.store_output(output);
}));
+ if res.is_err() {
+ core.scheduler.unhandled_panic();
+ }
+
Poll::Ready(())
}