aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/task/state.rs
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-08-17 08:33:38 -0700
committerJoel Galenson <jgalenson@google.com>2021-08-17 08:40:48 -0700
commit642961436a727d51930e5839e3dbfee04ba4af95 (patch)
tree9da006d6d1c0e4667e8d848673b13cc7d2bb62ca /src/runtime/task/state.rs
parent1c33108b3901dd464f81acf08b5268ec294b3876 (diff)
downloadtokio-642961436a727d51930e5839e3dbfee04ba4af95.tar.gz
Upgrade rust/crates/tokio to 1.10.0
Test: make Change-Id: I4ec984178af20297aae0ed51f0b1c6410876a51b
Diffstat (limited to 'src/runtime/task/state.rs')
-rw-r--r--src/runtime/task/state.rs298
1 files changed, 222 insertions, 76 deletions
diff --git a/src/runtime/task/state.rs b/src/runtime/task/state.rs
index 1f08d6d..059a7f9 100644
--- a/src/runtime/task/state.rs
+++ b/src/runtime/task/state.rs
@@ -54,22 +54,52 @@ const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
/// State a task is initialized with
///
-/// A task is initialized with two references: one for the scheduler and one for
-/// the `JoinHandle`. As the task starts with a `JoinHandle`, `JOIN_INTEREST` is
-/// set. A new task is immediately pushed into the run queue for execution and
-/// starts with the `NOTIFIED` flag set.
-const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED;
+/// A task is initialized with three references:
+///
+/// * A reference that will be stored in an OwnedTasks or LocalOwnedTasks.
+/// * A reference that will be sent to the scheduler as an ordinary notification.
+/// * A reference for the JoinHandle.
+///
+/// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set.
+/// As the task starts with a `Notified`, `NOTIFIED` is set.
+const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED;
+
+#[must_use]
+pub(super) enum TransitionToRunning {
+ Success,
+ Cancelled,
+ Failed,
+ Dealloc,
+}
+
+#[must_use]
+pub(super) enum TransitionToIdle {
+ Ok,
+ OkNotified,
+ OkDealloc,
+ Cancelled,
+}
+
+#[must_use]
+pub(super) enum TransitionToNotifiedByVal {
+ DoNothing,
+ Submit,
+ Dealloc,
+}
+
+#[must_use]
+pub(super) enum TransitionToNotifiedByRef {
+ DoNothing,
+ Submit,
+}
/// All transitions are performed via RMW operations. This establishes an
/// unambiguous modification order.
impl State {
/// Return a task's initial state
pub(super) fn new() -> State {
- // A task is initialized with three references: one for the scheduler,
- // one for the `JoinHandle`, one for the task handle made available in
- // release. As the task starts with a `JoinHandle`, `JOIN_INTEREST` is
- // set. A new task is immediately pushed into the run queue for
- // execution and starts with the `NOTIFIED` flag set.
+ // The raw task returned by this method has a ref-count of three. See
+ // the comment on INITIAL_STATE for more.
State {
val: AtomicUsize::new(INITIAL_STATE),
}
@@ -80,57 +110,72 @@ impl State {
Snapshot(self.val.load(Acquire))
}
- /// Attempt to transition the lifecycle to `Running`.
- ///
- /// If `ref_inc` is set, the reference count is also incremented.
- ///
- /// The `NOTIFIED` bit is always unset.
- pub(super) fn transition_to_running(&self, ref_inc: bool) -> UpdateResult {
- self.fetch_update(|curr| {
- assert!(curr.is_notified());
-
- let mut next = curr;
+ /// Attempt to transition the lifecycle to `Running`. This sets the
+ /// notified bit to false so notifications during the poll can be detected.
+ pub(super) fn transition_to_running(&self) -> TransitionToRunning {
+ self.fetch_update_action(|mut next| {
+ let action;
+ assert!(next.is_notified());
if !next.is_idle() {
- return None;
- }
-
- if ref_inc {
- next.ref_inc();
+ // This happens if the task is either currently running or if it
+ // has already completed, e.g. if it was cancelled during
+ // shutdown. Consume the ref-count and return.
+ next.ref_dec();
+ if next.ref_count() == 0 {
+ action = TransitionToRunning::Dealloc;
+ } else {
+ action = TransitionToRunning::Failed;
+ }
+ } else {
+ // We are able to lock the RUNNING bit.
+ next.set_running();
+ next.unset_notified();
+
+ if next.is_cancelled() {
+ action = TransitionToRunning::Cancelled;
+ } else {
+ action = TransitionToRunning::Success;
+ }
}
-
- next.set_running();
- next.unset_notified();
- Some(next)
+ (action, Some(next))
})
}
/// Transitions the task from `Running` -> `Idle`.
///
- /// Returns `Ok` if the transition to `Idle` is successful, `Err` otherwise.
- /// In both cases, a snapshot of the state from **after** the transition is
- /// returned.
- ///
+ /// Returns `true` if the transition to `Idle` is successful, `false` otherwise.
/// The transition to `Idle` fails if the task has been flagged to be
/// cancelled.
- pub(super) fn transition_to_idle(&self) -> UpdateResult {
- self.fetch_update(|curr| {
+ pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
+ self.fetch_update_action(|curr| {
assert!(curr.is_running());
if curr.is_cancelled() {
- return None;
+ return (TransitionToIdle::Cancelled, None);
}
let mut next = curr;
+ let action;
next.unset_running();
- if next.is_notified() {
- // The caller needs to schedule the task. To do this, it needs a
- // waker. The waker requires a ref count.
+ if !next.is_notified() {
+ // Polling the future consumes the ref-count of the Notified.
+ next.ref_dec();
+ if next.ref_count() == 0 {
+ action = TransitionToIdle::OkDealloc;
+ } else {
+ action = TransitionToIdle::Ok;
+ }
+ } else {
+ // The caller will schedule a new notification, so we create a
+ // new ref-count for the notification. Our own ref-count is kept
+ // for now, and the caller will drop it shortly.
next.ref_inc();
+ action = TransitionToIdle::OkNotified;
}
- Some(next)
+ (action, Some(next))
})
}
@@ -146,38 +191,119 @@ impl State {
}
/// Transition from `Complete` -> `Terminal`, decrementing the reference
- /// count by 1.
+ /// count the specified number of times.
///
- /// When `ref_dec` is set, an additional ref count decrement is performed.
- /// This is used to batch atomic ops when possible.
- pub(super) fn transition_to_terminal(&self, complete: bool, ref_dec: bool) -> Snapshot {
- self.fetch_update(|mut snapshot| {
- if complete {
- snapshot.set_complete();
- } else {
- assert!(snapshot.is_complete());
- }
+ /// Returns true if the task should be deallocated.
+ pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
+ let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
+ assert!(
+ prev.ref_count() >= count,
+ "current: {}, sub: {}",
+ prev.ref_count(),
+ count
+ );
+ prev.ref_count() == count
+ }
+
+ /// Transitions the state to `NOTIFIED`.
+ ///
+ /// If no task needs to be submitted, a ref-count is consumed.
+ ///
+ /// If a task needs to be submitted, the ref-count is incremented for the
+ /// new Notified.
+ pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal {
+ self.fetch_update_action(|mut snapshot| {
+ let action;
+
+ if snapshot.is_running() {
+ // If the task is running, we mark it as notified, but we should
+ // not submit anything as the thread currently running the
+ // future is responsible for that.
+ snapshot.set_notified();
+ snapshot.ref_dec();
- // Decrement the primary handle
- snapshot.ref_dec();
+ // The thread that set the running bit also holds a ref-count.
+ assert!(snapshot.ref_count() > 0);
- if ref_dec {
- // Decrement a second time
+ action = TransitionToNotifiedByVal::DoNothing;
+ } else if snapshot.is_complete() || snapshot.is_notified() {
+ // We do not need to submit any notifications, but we have to
+ // decrement the ref-count.
snapshot.ref_dec();
+
+ if snapshot.ref_count() == 0 {
+ action = TransitionToNotifiedByVal::Dealloc;
+ } else {
+ action = TransitionToNotifiedByVal::DoNothing;
+ }
+ } else {
+ // We create a new notified that we can submit. The caller
+ // retains ownership of the ref-count they passed in.
+ snapshot.set_notified();
+ snapshot.ref_inc();
+ action = TransitionToNotifiedByVal::Submit;
}
- Some(snapshot)
+ (action, Some(snapshot))
})
- .unwrap()
}
/// Transitions the state to `NOTIFIED`.
+ pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef {
+ self.fetch_update_action(|mut snapshot| {
+ if snapshot.is_complete() || snapshot.is_notified() {
+ // There is nothing to do in this case.
+ (TransitionToNotifiedByRef::DoNothing, None)
+ } else if snapshot.is_running() {
+ // If the task is running, we mark it as notified, but we should
+ // not submit as the thread currently running the future is
+ // responsible for that.
+ snapshot.set_notified();
+ (TransitionToNotifiedByRef::DoNothing, Some(snapshot))
+ } else {
+ // The task is idle and not notified. We should submit a
+ // notification.
+ snapshot.set_notified();
+ snapshot.ref_inc();
+ (TransitionToNotifiedByRef::Submit, Some(snapshot))
+ }
+ })
+ }
+
+ /// Set the cancelled bit and transition the state to `NOTIFIED` if idle.
///
/// Returns `true` if the task needs to be submitted to the pool for
/// execution
- pub(super) fn transition_to_notified(&self) -> bool {
- let prev = Snapshot(self.val.fetch_or(NOTIFIED, AcqRel));
- prev.will_need_queueing()
+ pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
+ self.fetch_update_action(|mut snapshot| {
+ if snapshot.is_cancelled() || snapshot.is_complete() {
+ // Aborts to completed or cancelled tasks are no-ops.
+ (false, None)
+ } else if snapshot.is_running() {
+ // If the task is running, we mark it as cancelled. The thread
+ // running the task will notice the cancelled bit when it
+ // stops polling and it will kill the task.
+ //
+ // The set_notified() call is not strictly necessary but it will
+ // in some cases let a wake_by_ref call return without having
+ // to perform a compare_exchange.
+ snapshot.set_notified();
+ snapshot.set_cancelled();
+ (false, Some(snapshot))
+ } else {
+ // The task is idle. We set the cancelled and notified bits and
+ // submit a notification if the notified bit was not already
+ // set.
+ snapshot.set_cancelled();
+ if !snapshot.is_notified() {
+ snapshot.set_notified();
+ snapshot.ref_inc();
+ (true, Some(snapshot))
+ } else {
+ (false, Some(snapshot))
+ }
+ }
+ })
}
/// Set the `CANCELLED` bit and attempt to transition to `Running`.
@@ -191,17 +317,11 @@ impl State {
if snapshot.is_idle() {
snapshot.set_running();
-
- if snapshot.is_notified() {
- // If the task is idle and notified, this indicates the task is
- // in the run queue and is considered owned by the scheduler.
- // The shutdown operation claims ownership of the task, which
- // means we need to assign an additional ref-count to the task
- // in the queue.
- snapshot.ref_inc();
- }
}
+ // If the task was not idle, the thread currently running the task
+ // will notice the cancelled bit and cancel it once the poll
+ // completes.
snapshot.set_cancelled();
Some(snapshot)
});
@@ -317,9 +437,39 @@ impl State {
/// Returns `true` if the task should be released.
pub(super) fn ref_dec(&self) -> bool {
let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
+ assert!(prev.ref_count() >= 1);
prev.ref_count() == 1
}
+ /// Returns `true` if the task should be released.
+ pub(super) fn ref_dec_twice(&self) -> bool {
+ let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel));
+ assert!(prev.ref_count() >= 2);
+ prev.ref_count() == 2
+ }
+
+ fn fetch_update_action<F, T>(&self, mut f: F) -> T
+ where
+ F: FnMut(Snapshot) -> (T, Option<Snapshot>),
+ {
+ let mut curr = self.load();
+
+ loop {
+ let (output, next) = f(curr);
+ let next = match next {
+ Some(next) => next,
+ None => return output,
+ };
+
+ let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => return output,
+ Err(actual) => curr = Snapshot(actual),
+ }
+ }
+ }
+
fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
where
F: FnMut(Snapshot) -> Option<Snapshot>,
@@ -359,6 +509,10 @@ impl Snapshot {
self.0 &= !NOTIFIED
}
+ fn set_notified(&mut self) {
+ self.0 |= NOTIFIED
+ }
+
pub(super) fn is_running(self) -> bool {
self.0 & RUNNING == RUNNING
}
@@ -379,10 +533,6 @@ impl Snapshot {
self.0 |= CANCELLED;
}
- fn set_complete(&mut self) {
- self.0 |= COMPLETE;
- }
-
/// Returns `true` if the task's future has completed execution.
pub(super) fn is_complete(self) -> bool {
self.0 & COMPLETE == COMPLETE
@@ -421,10 +571,6 @@ impl Snapshot {
assert!(self.ref_count() > 0);
self.0 -= REF_ONE
}
-
- fn will_need_queueing(self) -> bool {
- !self.is_notified() && self.is_idle()
- }
}
impl fmt::Debug for State {