aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/task/harness.rs
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2021-02-09 18:18:56 -0800
committerStephen Hines <srhines@google.com>2021-03-03 21:34:52 +0000
commite3d8d80d2d8744ccdcd175323e0864c8f30fcedc (patch)
tree16d053e70d21e456d52f4a7762ee41441342b7a2 /src/runtime/task/harness.rs
parent925d648e545e70d6a4faae3d7efe5e0de885f922 (diff)
downloadtokio-e3d8d80d2d8744ccdcd175323e0864c8f30fcedc.tar.gz
Upgrade rust/crates/tokio to 1.2.0
Test: make Change-Id: Ib0f6a5201b51e9d122b6e867388a3856e16f803a
Diffstat (limited to 'src/runtime/task/harness.rs')
-rw-r--r--src/runtime/task/harness.rs493
1 files changed, 284 insertions, 209 deletions
diff --git a/src/runtime/task/harness.rs b/src/runtime/task/harness.rs
index 208d48c..7d596e3 100644
--- a/src/runtime/task/harness.rs
+++ b/src/runtime/task/harness.rs
@@ -1,12 +1,13 @@
-use crate::runtime::task::core::{Cell, Core, Header, Trailer};
+use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Scheduler, Trailer};
use crate::runtime::task::state::Snapshot;
+use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{JoinError, Notified, Schedule, Task};
use std::future::Future;
use std::mem;
use std::panic;
use std::ptr::NonNull;
-use std::task::{Poll, Waker};
+use std::task::{Context, Poll, Waker};
/// Typed raw task handle
pub(super) struct Harness<T: Future, S: 'static> {
@@ -35,6 +36,13 @@ where
fn core(&self) -> &Core<T, S> {
unsafe { &self.cell.as_ref().core }
}
+
+ fn scheduler_view(&self) -> SchedulerView<'_, S> {
+ SchedulerView {
+ header: self.header(),
+ scheduler: &self.core().scheduler,
+ }
+ }
}
impl<T, S> Harness<T, S>
@@ -48,102 +56,46 @@ where
///
/// Panics raised while polling the future are handled.
pub(super) fn poll(self) {
- // If this is the first time the task is polled, the task will be bound
- // to the scheduler, in which case the task ref count must be
- // incremented.
- let is_not_bound = !self.core().is_bound();
-
- // Transition the task to the running state.
- //
- // A failure to transition here indicates the task has been cancelled
- // while in the run queue pending execution.
- let snapshot = match self.header().state.transition_to_running(is_not_bound) {
- Ok(snapshot) => snapshot,
- Err(_) => {
- // The task was shutdown while in the run queue. At this point,
- // we just hold a ref counted reference. Drop it here.
+ match self.poll_inner() {
+ PollFuture::Notified => {
+ // Signal yield
+ self.core().scheduler.yield_now(Notified(self.to_task()));
+ // The ref-count was incremented as part of
+ // `transition_to_idle`.
self.drop_reference();
- return;
}
- };
-
- if is_not_bound {
- // Ensure the task is bound to a scheduler instance. Since this is
- // the first time polling the task, a scheduler instance is pulled
- // from the local context and assigned to the task.
- //
- // The scheduler maintains ownership of the task and responds to
- // `wake` calls.
- //
- // The task reference count has been incremented.
- //
- // Safety: Since we have unique access to the task so that we can
- // safely call `bind_scheduler`.
- self.core().bind_scheduler(self.to_task());
+ PollFuture::DropReference => {
+ self.drop_reference();
+ }
+ PollFuture::Complete(out, is_join_interested) => {
+ self.complete(out, is_join_interested);
+ }
+ PollFuture::None => (),
}
+ }
+
+ fn poll_inner(&self) -> PollFuture<T::Output> {
+ let snapshot = match self.scheduler_view().transition_to_running() {
+ TransitionToRunning::Ok(snapshot) => snapshot,
+ TransitionToRunning::DropReference => return PollFuture::DropReference,
+ };
// The transition to `Running` done above ensures that a lock on the
// future has been obtained. This also ensures the `*mut T` pointer
// contains the future (as opposed to the output) and is initialized.
- let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- struct Guard<'a, T: Future, S: Schedule> {
- core: &'a Core<T, S>,
- }
-
- impl<T: Future, S: Schedule> Drop for Guard<'_, T, S> {
- fn drop(&mut self) {
- self.core.drop_future_or_output();
- }
- }
-
- let guard = Guard { core: self.core() };
-
- // If the task is cancelled, avoid polling it, instead signalling it
- // is complete.
- if snapshot.is_cancelled() {
- Poll::Ready(Err(JoinError::cancelled()))
- } else {
- let res = guard.core.poll(self.header());
-
- // prevent the guard from dropping the future
- mem::forget(guard);
-
- res.map(Ok)
- }
- }));
-
- match res {
- Ok(Poll::Ready(out)) => {
- self.complete(out, snapshot.is_join_interested());
- }
- Ok(Poll::Pending) => {
- match self.header().state.transition_to_idle() {
- Ok(snapshot) => {
- if snapshot.is_notified() {
- // Signal yield
- self.core().yield_now(Notified(self.to_task()));
- // The ref-count was incremented as part of
- // `transition_to_idle`.
- self.drop_reference();
- }
- }
- Err(_) => self.cancel_task(),
- }
- }
- Err(err) => {
- self.complete(Err(JoinError::panic(err)), snapshot.is_join_interested());
- }
- }
+ let waker_ref = waker_ref::<T, S>(self.header());
+ let cx = Context::from_waker(&*waker_ref);
+ poll_future(self.header(), &self.core().stage, snapshot, cx)
}
pub(super) fn dealloc(self) {
// Release the join waker, if there is one.
- self.trailer().waker.with_mut(|_| ());
+ self.trailer().waker.with_mut(drop);
// Check causality
- self.core().stage.with_mut(|_| {});
- self.core().scheduler.with_mut(|_| {});
+ self.core().stage.with_mut(drop);
+ self.core().scheduler.with_mut(drop);
unsafe {
drop(Box::from_raw(self.cell.as_ptr()));
@@ -154,83 +106,9 @@ where
/// Read the task output into `dst`.
pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
- // Load a snapshot of the current task state
- let snapshot = self.header().state.load();
-
- debug_assert!(snapshot.is_join_interested());
-
- if !snapshot.is_complete() {
- // The waker must be stored in the task struct.
- let res = if snapshot.has_join_waker() {
- // There already is a waker stored in the struct. If it matches
- // the provided waker, then there is no further work to do.
- // Otherwise, the waker must be swapped.
- let will_wake = unsafe {
- // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE`
- // may mutate the `waker` field.
- self.trailer()
- .waker
- .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker))
- };
-
- if will_wake {
- // The task is not complete **and** the waker is up to date,
- // there is nothing further that needs to be done.
- return;
- }
-
- // Unset the `JOIN_WAKER` to gain mutable access to the `waker`
- // field then update the field with the new join worker.
- //
- // This requires two atomic operations, unsetting the bit and
- // then resetting it. If the task transitions to complete
- // concurrently to either one of those operations, then setting
- // the join waker fails and we proceed to reading the task
- // output.
- self.header()
- .state
- .unset_waker()
- .and_then(|snapshot| self.set_join_waker(waker.clone(), snapshot))
- } else {
- self.set_join_waker(waker.clone(), snapshot)
- };
-
- match res {
- Ok(_) => return,
- Err(snapshot) => {
- assert!(snapshot.is_complete());
- }
- }
+ if can_read_output(self.header(), self.trailer(), waker) {
+ *dst = Poll::Ready(self.core().stage.take_output());
}
-
- *dst = Poll::Ready(self.core().take_output());
- }
-
- fn set_join_waker(&self, waker: Waker, snapshot: Snapshot) -> Result<Snapshot, Snapshot> {
- assert!(snapshot.is_join_interested());
- assert!(!snapshot.has_join_waker());
-
- // Safety: Only the `JoinHandle` may set the `waker` field. When
- // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
- unsafe {
- self.trailer().waker.with_mut(|ptr| {
- *ptr = Some(waker);
- });
- }
-
- // Update the `JoinWaker` state accordingly
- let res = self.header().state.set_join_waker();
-
- // If the state could not be updated, then clear the join waker
- if res.is_err() {
- unsafe {
- self.trailer().waker.with_mut(|ptr| {
- *ptr = None;
- });
- }
- }
-
- res
}
pub(super) fn drop_join_handle_slow(self) {
@@ -242,7 +120,7 @@ where
// the scheduler or `JoinHandle`. i.e. if the output remains in the
// task structure until the task is deallocated, it may be dropped
// by a Waker on any arbitrary thread.
- self.core().drop_future_or_output();
+ self.core().stage.drop_future_or_output();
}
// Drop the `JoinHandle` reference, possibly deallocating the task
@@ -258,7 +136,7 @@ where
pub(super) fn wake_by_ref(&self) {
if self.header().state.transition_to_notified() {
- self.core().schedule(Notified(self.to_task()));
+ self.core().scheduler.schedule(Notified(self.to_task()));
}
}
@@ -282,44 +160,65 @@ where
// By transitioning the lifcycle to `Running`, we have permission to
// drop the future.
- self.cancel_task();
+ let err = cancel_task(&self.core().stage);
+ self.complete(Err(err), true)
}
// ====== internal ======
- fn cancel_task(self) {
- // Drop the future from a panic guard.
- let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- self.core().drop_future_or_output();
- }));
-
- if let Err(err) = res {
- // Dropping the future panicked, complete the join
- // handle with the panic to avoid dropping the panic
- // on the ground.
- self.complete(Err(JoinError::panic(err)), true);
- } else {
- self.complete(Err(JoinError::cancelled()), true);
- }
- }
-
- fn complete(mut self, output: super::Result<T::Output>, is_join_interested: bool) {
+ fn complete(self, output: super::Result<T::Output>, is_join_interested: bool) {
if is_join_interested {
// Store the output. The future has already been dropped
//
// Safety: Mutual exclusion is obtained by having transitioned the task
// state -> Running
- self.core().store_output(output);
+ let stage = &self.core().stage;
+ stage.store_output(output);
// Transition to `Complete`, notifying the `JoinHandle` if necessary.
- self.transition_to_complete();
+ transition_to_complete(self.header(), stage, &self.trailer());
}
// The task has completed execution and will no longer be scheduled.
//
// Attempts to batch a ref-dec with the state transition below.
- let ref_dec = if self.core().is_bound() {
- if let Some(task) = self.core().release(self.to_task()) {
+
+ if self
+ .scheduler_view()
+ .transition_to_terminal(is_join_interested)
+ {
+ self.dealloc()
+ }
+ }
+
+ fn to_task(&self) -> Task<S> {
+ self.scheduler_view().to_task()
+ }
+}
+
+enum TransitionToRunning {
+ Ok(Snapshot),
+ DropReference,
+}
+
+struct SchedulerView<'a, S> {
+ header: &'a Header,
+ scheduler: &'a Scheduler<S>,
+}
+
+impl<'a, S> SchedulerView<'a, S>
+where
+ S: Schedule,
+{
+ fn to_task(&self) -> Task<S> {
+ // SAFETY The header is from the same struct containing the scheduler `S` so the cast is safe
+ unsafe { Task::from_raw(self.header.into()) }
+ }
+
+ /// Returns true if the task should be deallocated.
+ fn transition_to_terminal(&self, is_join_interested: bool) -> bool {
+ let ref_dec = if self.scheduler.is_bound() {
+ if let Some(task) = self.scheduler.release(self.to_task()) {
mem::forget(task);
true
} else {
@@ -331,41 +230,217 @@ where
// This might deallocate
let snapshot = self
- .header()
+ .header
.state
.transition_to_terminal(!is_join_interested, ref_dec);
- if snapshot.ref_count() == 0 {
- self.dealloc()
+ snapshot.ref_count() == 0
+ }
+
+ fn transition_to_running(&self) -> TransitionToRunning {
+ // If this is the first time the task is polled, the task will be bound
+ // to the scheduler, in which case the task ref count must be
+ // incremented.
+ let is_not_bound = !self.scheduler.is_bound();
+
+ // Transition the task to the running state.
+ //
+ // A failure to transition here indicates the task has been cancelled
+ // while in the run queue pending execution.
+ let snapshot = match self.header.state.transition_to_running(is_not_bound) {
+ Ok(snapshot) => snapshot,
+ Err(_) => {
+ // The task was shutdown while in the run queue. At this point,
+ // we just hold a ref counted reference. Since we do not have access to it here
+ // return `DropReference` so the caller drops it.
+ return TransitionToRunning::DropReference;
+ }
+ };
+
+ if is_not_bound {
+ // Ensure the task is bound to a scheduler instance. Since this is
+ // the first time polling the task, a scheduler instance is pulled
+ // from the local context and assigned to the task.
+ //
+ // The scheduler maintains ownership of the task and responds to
+ // `wake` calls.
+ //
+ // The task reference count has been incremented.
+ //
+ // Safety: Since we have unique access to the task so that we can
+ // safely call `bind_scheduler`.
+ self.scheduler.bind_scheduler(self.to_task());
}
+ TransitionToRunning::Ok(snapshot)
}
+}
- /// Transitions the task's lifecycle to `Complete`. Notifies the
- /// `JoinHandle` if it still has interest in the completion.
- fn transition_to_complete(&mut self) {
- // Transition the task's lifecycle to `Complete` and get a snapshot of
- // the task's sate.
- let snapshot = self.header().state.transition_to_complete();
-
- if !snapshot.is_join_interested() {
- // The `JoinHandle` is not interested in the output of this task. It
- // is our responsibility to drop the output.
- self.core().drop_future_or_output();
- } else if snapshot.has_join_waker() {
- // Notify the join handle. The previous transition obtains the
- // lock on the waker cell.
- self.wake_join();
+/// Transitions the task's lifecycle to `Complete`. Notifies the
+/// `JoinHandle` if it still has interest in the completion.
+fn transition_to_complete<T>(header: &Header, stage: &CoreStage<T>, trailer: &Trailer)
+where
+ T: Future,
+{
+ // Transition the task's lifecycle to `Complete` and get a snapshot of
+ // the task's sate.
+ let snapshot = header.state.transition_to_complete();
+
+ if !snapshot.is_join_interested() {
+ // The `JoinHandle` is not interested in the output of this task. It
+ // is our responsibility to drop the output.
+ stage.drop_future_or_output();
+ } else if snapshot.has_join_waker() {
+ // Notify the join handle. The previous transition obtains the
+ // lock on the waker cell.
+ trailer.wake_join();
+ }
+}
+
+fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
+ // Load a snapshot of the current task state
+ let snapshot = header.state.load();
+
+ debug_assert!(snapshot.is_join_interested());
+
+ if !snapshot.is_complete() {
+ // The waker must be stored in the task struct.
+ let res = if snapshot.has_join_waker() {
+ // There already is a waker stored in the struct. If it matches
+ // the provided waker, then there is no further work to do.
+ // Otherwise, the waker must be swapped.
+ let will_wake = unsafe {
+ // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE`
+ // may mutate the `waker` field.
+ trailer.will_wake(waker)
+ };
+
+ if will_wake {
+ // The task is not complete **and** the waker is up to date,
+ // there is nothing further that needs to be done.
+ return false;
+ }
+
+ // Unset the `JOIN_WAKER` to gain mutable access to the `waker`
+ // field then update the field with the new join worker.
+ //
+ // This requires two atomic operations, unsetting the bit and
+ // then resetting it. If the task transitions to complete
+ // concurrently to either one of those operations, then setting
+ // the join waker fails and we proceed to reading the task
+ // output.
+ header
+ .state
+ .unset_waker()
+ .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
+ } else {
+ set_join_waker(header, trailer, waker.clone(), snapshot)
+ };
+
+ match res {
+ Ok(_) => return false,
+ Err(snapshot) => {
+ assert!(snapshot.is_complete());
+ }
}
}
+ true
+}
- fn wake_join(&self) {
- self.trailer().waker.with(|ptr| match unsafe { &*ptr } {
- Some(waker) => waker.wake_by_ref(),
- None => panic!("waker missing"),
- });
+fn set_join_waker(
+ header: &Header,
+ trailer: &Trailer,
+ waker: Waker,
+ snapshot: Snapshot,
+) -> Result<Snapshot, Snapshot> {
+ assert!(snapshot.is_join_interested());
+ assert!(!snapshot.has_join_waker());
+
+ // Safety: Only the `JoinHandle` may set the `waker` field. When
+ // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
+ unsafe {
+ trailer.set_waker(Some(waker));
}
- fn to_task(&self) -> Task<S> {
- unsafe { Task::from_raw(self.header().into()) }
+ // Update the `JoinWaker` state accordingly
+ let res = header.state.set_join_waker();
+
+ // If the state could not be updated, then clear the join waker
+ if res.is_err() {
+ unsafe {
+ trailer.set_waker(None);
+ }
+ }
+
+ res
+}
+
+enum PollFuture<T> {
+ Complete(Result<T, JoinError>, bool),
+ DropReference,
+ Notified,
+ None,
+}
+
+fn cancel_task<T: Future>(stage: &CoreStage<T>) -> JoinError {
+ // Drop the future from a panic guard.
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ stage.drop_future_or_output();
+ }));
+
+ if let Err(err) = res {
+ // Dropping the future panicked, complete the join
+ // handle with the panic to avoid dropping the panic
+ // on the ground.
+ JoinError::panic(err)
+ } else {
+ JoinError::cancelled()
+ }
+}
+
+fn poll_future<T: Future>(
+ header: &Header,
+ core: &CoreStage<T>,
+ snapshot: Snapshot,
+ cx: Context<'_>,
+) -> PollFuture<T::Output> {
+ if snapshot.is_cancelled() {
+ PollFuture::Complete(Err(JoinError::cancelled()), snapshot.is_join_interested())
+ } else {
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ struct Guard<'a, T: Future> {
+ core: &'a CoreStage<T>,
+ }
+
+ impl<T: Future> Drop for Guard<'_, T> {
+ fn drop(&mut self) {
+ self.core.drop_future_or_output();
+ }
+ }
+
+ let guard = Guard { core };
+
+ let res = guard.core.poll(cx);
+
+ // prevent the guard from dropping the future
+ mem::forget(guard);
+
+ res
+ }));
+ match res {
+ Ok(Poll::Pending) => match header.state.transition_to_idle() {
+ Ok(snapshot) => {
+ if snapshot.is_notified() {
+ PollFuture::Notified
+ } else {
+ PollFuture::None
+ }
+ }
+ Err(_) => PollFuture::Complete(Err(cancel_task(core)), true),
+ },
+ Ok(Poll::Ready(ok)) => PollFuture::Complete(Ok(ok), snapshot.is_join_interested()),
+ Err(err) => {
+ PollFuture::Complete(Err(JoinError::panic(err)), snapshot.is_join_interested())
+ }
+ }
}
}